1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Implementations of different data feeders to provide data for TF trainer (deprecated). 16 17This module and all its submodules are deprecated. See 18[contrib/learn/README.md](https://www.tensorflow.org/code/tensorflow/contrib/learn/README.md) 19for migration instructions. 20""" 21 22# TODO(ipolosukhin): Replace this module with feed-dict queue runners & queues. 23 24from __future__ import absolute_import 25from __future__ import division 26from __future__ import print_function 27 28import itertools 29import math 30 31import numpy as np 32import six 33from six.moves import xrange # pylint: disable=redefined-builtin 34 35from tensorflow.python.framework import dtypes 36from tensorflow.python.framework import tensor_util 37from tensorflow.python.ops import array_ops 38from tensorflow.python.platform import tf_logging as logging 39from tensorflow.python.util.deprecation import deprecated 40 41# pylint: disable=g-multiple-import,g-bad-import-order 42from .pandas_io import HAS_PANDAS, extract_pandas_data, extract_pandas_matrix, extract_pandas_labels 43from .dask_io import HAS_DASK, extract_dask_data, extract_dask_labels 44 45# pylint: enable=g-multiple-import,g-bad-import-order 46 47 48def _get_in_out_shape(x_shape, y_shape, n_classes, batch_size=None): 49 """Returns shape for input and output of the data feeder.""" 50 x_is_dict, y_is_dict = isinstance( 51 x_shape, dict), y_shape is not None and isinstance(y_shape, dict) 52 if y_is_dict and n_classes is not None: 53 assert isinstance(n_classes, dict) 54 55 if batch_size is None: 56 batch_size = list(x_shape.values())[0][0] if x_is_dict else x_shape[0] 57 elif batch_size <= 0: 58 raise ValueError('Invalid batch_size %d.' % batch_size) 59 60 if x_is_dict: 61 input_shape = {} 62 for k, v in list(x_shape.items()): 63 input_shape[k] = [batch_size] + (list(v[1:]) if len(v) > 1 else [1]) 64 else: 65 x_shape = list(x_shape[1:]) if len(x_shape) > 1 else [1] 66 input_shape = [batch_size] + x_shape 67 68 if y_shape is None: 69 return input_shape, None, batch_size 70 71 def out_el_shape(out_shape, num_classes): 72 out_shape = list(out_shape[1:]) if len(out_shape) > 1 else [] 73 # Skip first dimension if it is 1. 74 if out_shape and out_shape[0] == 1: 75 out_shape = out_shape[1:] 76 if num_classes is not None and num_classes > 1: 77 return [batch_size] + out_shape + [num_classes] 78 else: 79 return [batch_size] + out_shape 80 81 if not y_is_dict: 82 output_shape = out_el_shape(y_shape, n_classes) 83 else: 84 output_shape = dict([(k, 85 out_el_shape(v, n_classes[k] 86 if n_classes is not None and 87 k in n_classes else None)) 88 for k, v in list(y_shape.items())]) 89 90 return input_shape, output_shape, batch_size 91 92 93def _data_type_filter(x, y): 94 """Filter data types into acceptable format.""" 95 if HAS_DASK: 96 x = extract_dask_data(x) 97 if y is not None: 98 y = extract_dask_labels(y) 99 if HAS_PANDAS: 100 x = extract_pandas_data(x) 101 if y is not None: 102 y = extract_pandas_labels(y) 103 return x, y 104 105 106def _is_iterable(x): 107 return hasattr(x, 'next') or hasattr(x, '__next__') 108 109 110@deprecated(None, 'Please use tensorflow/transform or tf.data.') 111def setup_train_data_feeder(x, 112 y, 113 n_classes, 114 batch_size=None, 115 shuffle=True, 116 epochs=None): 117 """Create data feeder, to sample inputs from dataset. 118 119 If `x` and `y` are iterators, use `StreamingDataFeeder`. 120 121 Args: 122 x: numpy, pandas or Dask matrix or dictionary of aforementioned. Also 123 supports iterables. 124 y: numpy, pandas or Dask array or dictionary of aforementioned. Also 125 supports 126 iterables. 127 n_classes: number of classes. Must be None or same type as y. In case, `y` 128 is `dict` 129 (or iterable which returns dict) such that `n_classes[key] = n_classes for 130 y[key]` 131 batch_size: size to split data into parts. Must be >= 1. 132 shuffle: Whether to shuffle the inputs. 133 epochs: Number of epochs to run. 134 135 Returns: 136 DataFeeder object that returns training data. 137 138 Raises: 139 ValueError: if one of `x` and `y` is iterable and the other is not. 140 """ 141 x, y = _data_type_filter(x, y) 142 if HAS_DASK: 143 # pylint: disable=g-import-not-at-top 144 import dask.dataframe as dd 145 if (isinstance(x, (dd.Series, dd.DataFrame)) and 146 (y is None or isinstance(y, (dd.Series, dd.DataFrame)))): 147 data_feeder_cls = DaskDataFeeder 148 else: 149 data_feeder_cls = DataFeeder 150 else: 151 data_feeder_cls = DataFeeder 152 153 if _is_iterable(x): 154 if y is not None and not _is_iterable(y): 155 raise ValueError('Both x and y should be iterators for ' 156 'streaming learning to work.') 157 return StreamingDataFeeder(x, y, n_classes, batch_size) 158 return data_feeder_cls( 159 x, y, n_classes, batch_size, shuffle=shuffle, epochs=epochs) 160 161 162def _batch_data(x, batch_size=None): 163 if (batch_size is not None) and (batch_size <= 0): 164 raise ValueError('Invalid batch_size %d.' % batch_size) 165 166 x_first_el = six.next(x) 167 x = itertools.chain([x_first_el], x) 168 169 chunk = dict([(k, []) for k in list(x_first_el.keys())]) if isinstance( 170 x_first_el, dict) else [] 171 chunk_filled = False 172 for data in x: 173 if isinstance(data, dict): 174 for k, v in list(data.items()): 175 chunk[k].append(v) 176 if (batch_size is not None) and (len(chunk[k]) >= batch_size): 177 chunk[k] = np.matrix(chunk[k]) 178 chunk_filled = True 179 if chunk_filled: 180 yield chunk 181 chunk = dict([(k, []) for k in list(x_first_el.keys())]) if isinstance( 182 x_first_el, dict) else [] 183 chunk_filled = False 184 else: 185 chunk.append(data) 186 if (batch_size is not None) and (len(chunk) >= batch_size): 187 yield np.matrix(chunk) 188 chunk = [] 189 190 if isinstance(x_first_el, dict): 191 for k, v in list(data.items()): 192 chunk[k] = np.matrix(chunk[k]) 193 yield chunk 194 else: 195 yield np.matrix(chunk) 196 197 198@deprecated(None, 'Please use tensorflow/transform or tf.data.') 199def setup_predict_data_feeder(x, batch_size=None): 200 """Returns an iterable for feeding into predict step. 201 202 Args: 203 x: numpy, pandas, Dask array or dictionary of aforementioned. Also supports 204 iterable. 205 batch_size: Size of batches to split data into. If `None`, returns one 206 batch of full size. 207 208 Returns: 209 List or iterator (or dictionary thereof) of parts of data to predict on. 210 211 Raises: 212 ValueError: if `batch_size` <= 0. 213 """ 214 if HAS_DASK: 215 x = extract_dask_data(x) 216 if HAS_PANDAS: 217 x = extract_pandas_data(x) 218 if _is_iterable(x): 219 return _batch_data(x, batch_size) 220 if len(x.shape) == 1: 221 x = np.reshape(x, (-1, 1)) 222 if batch_size is not None: 223 if batch_size <= 0: 224 raise ValueError('Invalid batch_size %d.' % batch_size) 225 n_batches = int(math.ceil(float(len(x)) / batch_size)) 226 return [x[i * batch_size:(i + 1) * batch_size] for i in xrange(n_batches)] 227 return [x] 228 229 230@deprecated(None, 'Please use tensorflow/transform or tf.data.') 231def setup_processor_data_feeder(x): 232 """Sets up processor iterable. 233 234 Args: 235 x: numpy, pandas or iterable. 236 237 Returns: 238 Iterable of data to process. 239 """ 240 if HAS_PANDAS: 241 x = extract_pandas_matrix(x) 242 return x 243 244 245@deprecated(None, 'Please convert numpy dtypes explicitly.') 246def check_array(array, dtype): 247 """Checks array on dtype and converts it if different. 248 249 Args: 250 array: Input array. 251 dtype: Expected dtype. 252 253 Returns: 254 Original array or converted. 255 """ 256 # skip check if array is instance of other classes, e.g. h5py.Dataset 257 # to avoid copying array and loading whole data into memory 258 if isinstance(array, (np.ndarray, list)): 259 array = np.array(array, dtype=dtype, order=None, copy=False) 260 return array 261 262 263def _access(data, iloc): 264 """Accesses an element from collection, using integer location based indexing. 265 266 Args: 267 data: array-like. The collection to access 268 iloc: `int` or `list` of `int`s. Location(s) to access in `collection` 269 270 Returns: 271 The element of `a` found at location(s) `iloc`. 272 """ 273 if HAS_PANDAS: 274 import pandas as pd # pylint: disable=g-import-not-at-top 275 if isinstance(data, pd.Series) or isinstance(data, pd.DataFrame): 276 return data.iloc[iloc] 277 return data[iloc] 278 279 280def _check_dtype(dtype): 281 if dtypes.as_dtype(dtype) == dtypes.float64: 282 logging.warn( 283 'float64 is not supported by many models, consider casting to float32.') 284 return dtype 285 286 287class DataFeeder(object): 288 """Data feeder is an example class to sample data for TF trainer. 289 290 THIS CLASS IS DEPRECATED. See 291 [contrib/learn/README.md](https://www.tensorflow.org/code/tensorflow/contrib/learn/README.md) 292 for general migration instructions. 293 """ 294 295 @deprecated(None, 'Please use tensorflow/transform or tf.data.') 296 def __init__(self, 297 x, 298 y, 299 n_classes, 300 batch_size=None, 301 shuffle=True, 302 random_state=None, 303 epochs=None): 304 """Initializes a DataFeeder instance. 305 306 Args: 307 x: One feature sample which can either Nd numpy matrix of shape 308 `[n_samples, n_features, ...]` or dictionary of Nd numpy matrix. 309 y: label vector, either floats for regression or class id for 310 classification. If matrix, will consider as a sequence of labels. 311 Can be `None` for unsupervised setting. Also supports dictionary of 312 labels. 313 n_classes: Number of classes, 0 and 1 are considered regression, `None` 314 will pass through the input labels without one-hot conversion. Also, if 315 `y` is `dict`, then `n_classes` must be `dict` such that 316 `n_classes[key] = n_classes for label y[key]`, `None` otherwise. 317 batch_size: Mini-batch size to accumulate samples in one mini batch. 318 shuffle: Whether to shuffle `x`. 319 random_state: Numpy `RandomState` object to reproduce sampling. 320 epochs: Number of times to iterate over input data before raising 321 `StopIteration` exception. 322 323 Attributes: 324 x: Input features (ndarray or dictionary of ndarrays). 325 y: Input label (ndarray or dictionary of ndarrays). 326 n_classes: Number of classes (if `None`, pass through indices without 327 one-hot conversion). 328 batch_size: Mini-batch size to accumulate. 329 input_shape: Shape of the input (or dictionary of shapes). 330 output_shape: Shape of the output (or dictionary of shapes). 331 input_dtype: DType of input (or dictionary of shapes). 332 output_dtype: DType of output (or dictionary of shapes. 333 """ 334 x_is_dict, y_is_dict = isinstance( 335 x, dict), y is not None and isinstance(y, dict) 336 if isinstance(y, list): 337 y = np.array(y) 338 339 self._x = dict([(k, check_array(v, v.dtype)) for k, v in list(x.items()) 340 ]) if x_is_dict else check_array(x, x.dtype) 341 self._y = None if y is None else (dict( 342 [(k, check_array(v, v.dtype)) for k, v in list(y.items())]) 343 if y_is_dict else check_array(y, y.dtype)) 344 345 # self.n_classes is not None means we're converting raw target indices 346 # to one-hot. 347 if n_classes is not None: 348 if not y_is_dict: 349 y_dtype = ( 350 np.int64 if n_classes is not None and n_classes > 1 else np.float32) 351 self._y = (None if y is None else check_array(y, dtype=y_dtype)) 352 353 self.n_classes = n_classes 354 self.max_epochs = epochs 355 356 x_shape = dict([(k, v.shape) for k, v in list(self._x.items()) 357 ]) if x_is_dict else self._x.shape 358 y_shape = dict([(k, v.shape) for k, v in list(self._y.items()) 359 ]) if y_is_dict else None if y is None else self._y.shape 360 361 self.input_shape, self.output_shape, self._batch_size = _get_in_out_shape( 362 x_shape, y_shape, n_classes, batch_size) 363 364 # Input dtype matches dtype of x. 365 self._input_dtype = ( 366 dict([(k, _check_dtype(v.dtype)) for k, v in list(self._x.items())]) 367 if x_is_dict else _check_dtype(self._x.dtype)) 368 369 # self._output_dtype == np.float32 when y is None 370 self._output_dtype = ( 371 dict([(k, _check_dtype(v.dtype)) for k, v in list(self._y.items())]) 372 if y_is_dict else (_check_dtype(self._y.dtype) 373 if y is not None else np.float32)) 374 375 # self.n_classes is None means we're passing in raw target indices 376 if n_classes is not None and y_is_dict: 377 for key in list(n_classes.keys()): 378 if key in self._output_dtype: 379 self._output_dtype[key] = np.float32 380 381 self._shuffle = shuffle 382 self.random_state = np.random.RandomState( 383 42) if random_state is None else random_state 384 385 if x_is_dict: 386 num_samples = list(self._x.values())[0].shape[0] 387 elif tensor_util.is_tensor(self._x): 388 num_samples = self._x.shape[ 389 0].value # shape will be a Dimension, extract an int 390 else: 391 num_samples = self._x.shape[0] 392 393 if self._shuffle: 394 self.indices = self.random_state.permutation(num_samples) 395 else: 396 self.indices = np.array(range(num_samples)) 397 self.offset = 0 398 self.epoch = 0 399 self._epoch_placeholder = None 400 401 @property 402 def x(self): 403 return self._x 404 405 @property 406 def y(self): 407 return self._y 408 409 @property 410 def shuffle(self): 411 return self._shuffle 412 413 @property 414 def input_dtype(self): 415 return self._input_dtype 416 417 @property 418 def output_dtype(self): 419 return self._output_dtype 420 421 @property 422 def batch_size(self): 423 return self._batch_size 424 425 def make_epoch_variable(self): 426 """Adds a placeholder variable for the epoch to the graph. 427 428 Returns: 429 The epoch placeholder. 430 """ 431 self._epoch_placeholder = array_ops.placeholder( 432 dtypes.int32, [1], name='epoch') 433 return self._epoch_placeholder 434 435 def input_builder(self): 436 """Builds inputs in the graph. 437 438 Returns: 439 Two placeholders for inputs and outputs. 440 """ 441 442 def get_placeholder(shape, dtype, name_prepend): 443 if shape is None: 444 return None 445 if isinstance(shape, dict): 446 placeholder = {} 447 for key in list(shape.keys()): 448 placeholder[key] = array_ops.placeholder( 449 dtypes.as_dtype(dtype[key]), [None] + shape[key][1:], 450 name=name_prepend + '_' + key) 451 else: 452 placeholder = array_ops.placeholder( 453 dtypes.as_dtype(dtype), [None] + shape[1:], name=name_prepend) 454 return placeholder 455 456 self._input_placeholder = get_placeholder(self.input_shape, 457 self._input_dtype, 'input') 458 self._output_placeholder = get_placeholder(self.output_shape, 459 self._output_dtype, 'output') 460 return self._input_placeholder, self._output_placeholder 461 462 def set_placeholders(self, input_placeholder, output_placeholder): 463 """Sets placeholders for this data feeder. 464 465 Args: 466 input_placeholder: Placeholder for `x` variable. Should match shape 467 of the examples in the x dataset. 468 output_placeholder: Placeholder for `y` variable. Should match 469 shape of the examples in the y dataset. Can be `None`. 470 """ 471 self._input_placeholder = input_placeholder 472 self._output_placeholder = output_placeholder 473 474 def get_feed_params(self): 475 """Function returns a `dict` with data feed params while training. 476 477 Returns: 478 A `dict` with data feed params while training. 479 """ 480 return { 481 'epoch': self.epoch, 482 'offset': self.offset, 483 'batch_size': self._batch_size 484 } 485 486 def get_feed_dict_fn(self): 487 """Returns a function that samples data into given placeholders. 488 489 Returns: 490 A function that when called samples a random subset of batch size 491 from `x` and `y`. 492 """ 493 x_is_dict, y_is_dict = isinstance( 494 self._x, dict), self._y is not None and isinstance(self._y, dict) 495 496 # Assign input features from random indices. 497 def extract(data, indices): 498 return (np.array(_access(data, indices)).reshape((indices.shape[0], 1)) 499 if len(data.shape) == 1 else _access(data, indices)) 500 501 # assign labels from random indices 502 def assign_label(data, shape, dtype, n_classes, indices): 503 shape[0] = indices.shape[0] 504 out = np.zeros(shape, dtype=dtype) 505 for i in xrange(out.shape[0]): 506 sample = indices[i] 507 # self.n_classes is None means we're passing in raw target indices 508 if n_classes is None: 509 out[i] = _access(data, sample) 510 else: 511 if n_classes > 1: 512 if len(shape) == 2: 513 out.itemset((i, int(_access(data, sample))), 1.0) 514 else: 515 for idx, value in enumerate(_access(data, sample)): 516 out.itemset(tuple([i, idx, value]), 1.0) 517 else: 518 out[i] = _access(data, sample) 519 return out 520 521 def _feed_dict_fn(): 522 """Function that samples data into given placeholders.""" 523 if self.max_epochs is not None and self.epoch + 1 > self.max_epochs: 524 raise StopIteration 525 assert self._input_placeholder is not None 526 feed_dict = {} 527 if self._epoch_placeholder is not None: 528 feed_dict[self._epoch_placeholder.name] = [self.epoch] 529 530 # Take next batch of indices. 531 x_len = list( 532 self._x.values())[0].shape[0] if x_is_dict else self._x.shape[0] 533 end = min(x_len, self.offset + self._batch_size) 534 batch_indices = self.indices[self.offset:end] 535 536 # adding input placeholder 537 feed_dict.update( 538 dict([(self._input_placeholder[k].name, extract(v, batch_indices)) 539 for k, v in list(self._x.items())]) if x_is_dict else { 540 self._input_placeholder.name: 541 extract(self._x, batch_indices) 542 }) 543 544 # move offset and reset it if necessary 545 self.offset += self._batch_size 546 if self.offset >= x_len: 547 self.indices = self.random_state.permutation( 548 x_len) if self._shuffle else np.array(range(x_len)) 549 self.offset = 0 550 self.epoch += 1 551 552 # return early if there are no labels 553 if self._output_placeholder is None: 554 return feed_dict 555 556 # adding output placeholders 557 if y_is_dict: 558 for k, v in list(self._y.items()): 559 n_classes = (self.n_classes[k] if k in self.n_classes else 560 None) if self.n_classes is not None else None 561 shape, dtype = self.output_shape[k], self._output_dtype[k] 562 feed_dict.update({ 563 self._output_placeholder[k].name: 564 assign_label(v, shape, dtype, n_classes, batch_indices) 565 }) 566 else: 567 shape, dtype, n_classes = (self.output_shape, self._output_dtype, 568 self.n_classes) 569 feed_dict.update({ 570 self._output_placeholder.name: 571 assign_label(self._y, shape, dtype, n_classes, batch_indices) 572 }) 573 574 return feed_dict 575 576 return _feed_dict_fn 577 578 579class StreamingDataFeeder(DataFeeder): 580 """Data feeder for TF trainer that reads data from iterator. 581 582 THIS CLASS IS DEPRECATED. See 583 [contrib/learn/README.md](https://www.tensorflow.org/code/tensorflow/contrib/learn/README.md) 584 for general migration instructions. 585 586 Streaming data feeder allows to read data as it comes it from disk or 587 somewhere else. It's custom to have this iterators rotate infinetly over 588 the dataset, to allow control of how much to learn on the trainer side. 589 """ 590 591 def __init__(self, x, y, n_classes, batch_size): 592 """Initializes a StreamingDataFeeder instance. 593 594 Args: 595 x: iterator each element of which returns one feature sample. Sample can 596 be a Nd numpy matrix or dictionary of Nd numpy matrices. 597 y: iterator each element of which returns one label sample. Sample can be 598 a Nd numpy matrix or dictionary of Nd numpy matrices with 1 or many 599 classes regression values. 600 n_classes: indicator of how many classes the corresponding label sample 601 has for the purposes of one-hot conversion of label. In case where `y` 602 is a dictionary, `n_classes` must be dictionary (with same keys as `y`) 603 of how many classes there are in each label in `y`. If key is 604 present in `y` and missing in `n_classes`, the value is assumed `None` 605 and no one-hot conversion will be applied to the label with that key. 606 batch_size: Mini batch size to accumulate samples in one batch. If set 607 `None`, then assumes that iterator to return already batched element. 608 609 Attributes: 610 x: input features (or dictionary of input features). 611 y: input label (or dictionary of output features). 612 n_classes: number of classes. 613 batch_size: mini batch size to accumulate. 614 input_shape: shape of the input (can be dictionary depending on `x`). 615 output_shape: shape of the output (can be dictionary depending on `y`). 616 input_dtype: dtype of input (can be dictionary depending on `x`). 617 output_dtype: dtype of output (can be dictionary depending on `y`). 618 """ 619 # pylint: disable=invalid-name,super-init-not-called 620 x_first_el = six.next(x) 621 self._x = itertools.chain([x_first_el], x) 622 if y is not None: 623 y_first_el = six.next(y) 624 self._y = itertools.chain([y_first_el], y) 625 else: 626 y_first_el = None 627 self._y = None 628 self.n_classes = n_classes 629 630 x_is_dict = isinstance(x_first_el, dict) 631 y_is_dict = y is not None and isinstance(y_first_el, dict) 632 if y_is_dict and n_classes is not None: 633 assert isinstance(n_classes, dict) 634 635 # extract shapes for first_elements 636 if x_is_dict: 637 x_first_el_shape = dict( 638 [(k, [1] + list(v.shape)) for k, v in list(x_first_el.items())]) 639 else: 640 x_first_el_shape = [1] + list(x_first_el.shape) 641 642 if y_is_dict: 643 y_first_el_shape = dict( 644 [(k, [1] + list(v.shape)) for k, v in list(y_first_el.items())]) 645 elif y is None: 646 y_first_el_shape = None 647 else: 648 y_first_el_shape = ( 649 [1] + list(y_first_el[0].shape 650 if isinstance(y_first_el, list) else y_first_el.shape)) 651 652 self.input_shape, self.output_shape, self._batch_size = _get_in_out_shape( 653 x_first_el_shape, y_first_el_shape, n_classes, batch_size) 654 655 # Input dtype of x_first_el. 656 if x_is_dict: 657 self._input_dtype = dict( 658 [(k, _check_dtype(v.dtype)) for k, v in list(x_first_el.items())]) 659 else: 660 self._input_dtype = _check_dtype(x_first_el.dtype) 661 662 # Output dtype of y_first_el. 663 def check_y_dtype(el): 664 if isinstance(el, np.ndarray): 665 return el.dtype 666 elif isinstance(el, list): 667 return check_y_dtype(el[0]) 668 else: 669 return _check_dtype(np.dtype(type(el))) 670 671 # Output types are floats, due to both softmaxes and regression req. 672 if n_classes is not None and (y is None or not y_is_dict) and n_classes > 0: 673 self._output_dtype = np.float32 674 elif y_is_dict: 675 self._output_dtype = dict( 676 [(k, check_y_dtype(v)) for k, v in list(y_first_el.items())]) 677 elif y is None: 678 self._output_dtype = None 679 else: 680 self._output_dtype = check_y_dtype(y_first_el) 681 682 def get_feed_params(self): 683 """Function returns a `dict` with data feed params while training. 684 685 Returns: 686 A `dict` with data feed params while training. 687 """ 688 return {'batch_size': self._batch_size} 689 690 def get_feed_dict_fn(self): 691 """Returns a function, that will sample data and provide it to placeholders. 692 693 Returns: 694 A function that when called samples a random subset of batch size 695 from x and y. 696 """ 697 self.stopped = False 698 699 def _feed_dict_fn(): 700 """Samples data and provides it to placeholders. 701 702 Returns: 703 `dict` of input and output tensors. 704 """ 705 706 def init_array(shape, dtype): 707 """Initialize array of given shape or dict of shapes and dtype.""" 708 if shape is None: 709 return None 710 elif isinstance(shape, dict): 711 return dict( 712 [(k, np.zeros(shape[k], dtype[k])) for k in list(shape.keys())]) 713 else: 714 return np.zeros(shape, dtype=dtype) 715 716 def put_data_array(dest, index, source=None, n_classes=None): 717 """Puts data array into container.""" 718 if source is None: 719 dest = dest[:index] 720 elif n_classes is not None and n_classes > 1: 721 if len(self.output_shape) == 2: 722 dest.itemset((index, source), 1.0) 723 else: 724 for idx, value in enumerate(source): 725 dest.itemset(tuple([index, idx, value]), 1.0) 726 else: 727 if len(dest.shape) > 1: 728 dest[index, :] = source 729 else: 730 dest[index] = source[0] if isinstance(source, list) else source 731 return dest 732 733 def put_data_array_or_dict(holder, index, data=None, n_classes=None): 734 """Puts data array or data dictionary into container.""" 735 if holder is None: 736 return None 737 if isinstance(holder, dict): 738 if data is None: 739 data = {k: None for k in holder.keys()} 740 assert isinstance(data, dict) 741 for k in holder.keys(): 742 num_classes = n_classes[k] if (n_classes is not None and 743 k in n_classes) else None 744 holder[k] = put_data_array(holder[k], index, data[k], num_classes) 745 else: 746 holder = put_data_array(holder, index, data, n_classes) 747 return holder 748 749 if self.stopped: 750 raise StopIteration 751 752 inp = init_array(self.input_shape, self._input_dtype) 753 out = init_array(self.output_shape, self._output_dtype) 754 755 for i in xrange(self._batch_size): 756 # Add handling when queue ends. 757 try: 758 next_inp = six.next(self._x) 759 inp = put_data_array_or_dict(inp, i, next_inp, None) 760 except StopIteration: 761 self.stopped = True 762 if i == 0: 763 raise 764 inp = put_data_array_or_dict(inp, i, None, None) 765 out = put_data_array_or_dict(out, i, None, None) 766 break 767 768 if self._y is not None: 769 next_out = six.next(self._y) 770 out = put_data_array_or_dict(out, i, next_out, self.n_classes) 771 772 # creating feed_dict 773 if isinstance(inp, dict): 774 feed_dict = dict([(self._input_placeholder[k].name, inp[k]) 775 for k in list(self._input_placeholder.keys())]) 776 else: 777 feed_dict = {self._input_placeholder.name: inp} 778 if self._y is not None: 779 if isinstance(out, dict): 780 feed_dict.update( 781 dict([(self._output_placeholder[k].name, out[k]) 782 for k in list(self._output_placeholder.keys())])) 783 else: 784 feed_dict.update({self._output_placeholder.name: out}) 785 786 return feed_dict 787 788 return _feed_dict_fn 789 790 791class DaskDataFeeder(object): 792 """Data feeder for that reads data from dask.Series and dask.DataFrame. 793 794 THIS CLASS IS DEPRECATED. See 795 [contrib/learn/README.md](https://www.tensorflow.org/code/tensorflow/contrib/learn/README.md) 796 for general migration instructions. 797 798 Numpy arrays can be serialized to disk and it's possible to do random seeks 799 into them. DaskDataFeeder will remove requirement to have full dataset in the 800 memory and still do random seeks for sampling of batches. 801 """ 802 803 @deprecated(None, 'Please feed input to tf.data to support dask.') 804 def __init__(self, 805 x, 806 y, 807 n_classes, 808 batch_size, 809 shuffle=True, 810 random_state=None, 811 epochs=None): 812 """Initializes a DaskDataFeeder instance. 813 814 Args: 815 x: iterator that returns for each element, returns features. 816 y: iterator that returns for each element, returns 1 or many classes / 817 regression values. 818 n_classes: indicator of how many classes the label has. 819 batch_size: Mini batch size to accumulate. 820 shuffle: Whether to shuffle the inputs. 821 random_state: random state for RNG. Note that it will mutate so use a 822 int value for this if you want consistent sized batches. 823 epochs: Number of epochs to run. 824 825 Attributes: 826 x: input features. 827 y: input label. 828 n_classes: number of classes. 829 batch_size: mini batch size to accumulate. 830 input_shape: shape of the input. 831 output_shape: shape of the output. 832 input_dtype: dtype of input. 833 output_dtype: dtype of output. 834 835 Raises: 836 ValueError: if `x` or `y` are `dict`, as they are not supported currently. 837 """ 838 839 if isinstance(x, dict) or isinstance(y, dict): 840 raise ValueError( 841 'DaskDataFeeder does not support dictionaries at the moment.') 842 843 # pylint: disable=invalid-name,super-init-not-called 844 import dask.dataframe as dd # pylint: disable=g-import-not-at-top 845 # TODO(terrytangyuan): check x and y dtypes in dask_io like pandas 846 self._x = x 847 self._y = y 848 # save column names 849 self._x_columns = list(x.columns) 850 if isinstance(y.columns[0], str): 851 self._y_columns = list(y.columns) 852 else: 853 # deal with cases where two DFs have overlapped default numeric colnames 854 self._y_columns = len(self._x_columns) + 1 855 self._y = self._y.rename(columns={y.columns[0]: self._y_columns}) 856 857 # TODO(terrytangyuan): deal with unsupervised cases 858 # combine into a data frame 859 self.df = dd.multi.concat([self._x, self._y], axis=1) 860 self.n_classes = n_classes 861 862 x_count = x.count().compute()[0] 863 x_shape = (x_count, len(self._x.columns)) 864 y_shape = (x_count, len(self._y.columns)) 865 # TODO(terrytangyuan): Add support for shuffle and epochs. 866 self._shuffle = shuffle 867 self.epochs = epochs 868 self.input_shape, self.output_shape, self._batch_size = _get_in_out_shape( 869 x_shape, y_shape, n_classes, batch_size) 870 self.sample_fraction = self._batch_size / float(x_count) 871 self._input_dtype = _check_dtype(self._x.dtypes[0]) 872 self._output_dtype = _check_dtype(self._y.dtypes[self._y_columns]) 873 if random_state is None: 874 self.random_state = 66 875 else: 876 self.random_state = random_state 877 878 def get_feed_params(self): 879 """Function returns a `dict` with data feed params while training. 880 881 Returns: 882 A `dict` with data feed params while training. 883 """ 884 return {'batch_size': self._batch_size} 885 886 def get_feed_dict_fn(self, input_placeholder, output_placeholder): 887 """Returns a function, that will sample data and provide it to placeholders. 888 889 Args: 890 input_placeholder: tf.placeholder for input features mini batch. 891 output_placeholder: tf.placeholder for output labels. 892 893 Returns: 894 A function that when called samples a random subset of batch size 895 from x and y. 896 """ 897 898 def _feed_dict_fn(): 899 """Samples data and provides it to placeholders.""" 900 # TODO(ipolosukhin): option for with/without replacement (dev version of 901 # dask) 902 sample = self.df.random_split( 903 [self.sample_fraction, 1 - self.sample_fraction], 904 random_state=self.random_state) 905 inp = extract_pandas_matrix(sample[0][self._x_columns].compute()).tolist() 906 out = extract_pandas_matrix(sample[0][self._y_columns].compute()) 907 # convert to correct dtype 908 inp = np.array(inp, dtype=self._input_dtype) 909 # one-hot encode out for each class for cross entropy loss 910 if HAS_PANDAS: 911 import pandas as pd # pylint: disable=g-import-not-at-top 912 if not isinstance(out, pd.Series): 913 out = out.flatten() 914 out_max = self._y.max().compute().values[0] 915 encoded_out = np.zeros((out.size, out_max + 1), dtype=self._output_dtype) 916 encoded_out[np.arange(out.size), out] = 1 917 return {input_placeholder.name: inp, output_placeholder.name: encoded_out} 918 919 return _feed_dict_fn 920