• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15
16"""Input pipeline.
17
18Please see the [reading data
19how-to](https://tensorflow.org/api_guides/python/reading_data)
20for context.
21"""
22
23from __future__ import absolute_import
24from __future__ import division
25from __future__ import print_function
26
27import collections
28
29from six.moves import xrange  # pylint: disable=redefined-builtin
30
31from tensorflow.python.eager import context
32from tensorflow.python.framework import constant_op
33from tensorflow.python.framework import dtypes
34from tensorflow.python.framework import ops
35from tensorflow.python.framework import sparse_tensor
36from tensorflow.python.framework import tensor_shape
37from tensorflow.python.layers import utils
38from tensorflow.python.ops import array_ops
39from tensorflow.python.ops import control_flow_ops
40from tensorflow.python.ops import data_flow_ops
41from tensorflow.python.ops import io_ops
42from tensorflow.python.ops import math_ops
43from tensorflow.python.ops import random_ops
44from tensorflow.python.ops import sparse_ops
45from tensorflow.python.ops import variable_scope as vs
46from tensorflow.python.summary import summary
47from tensorflow.python.training import queue_runner
48from tensorflow.python.util import deprecation
49from tensorflow.python.util.tf_export import tf_export
50
51
52# pylint: disable=protected-access
53_store_sparse = sparse_ops._add_sparse_to_tensors_map
54_store_many_sparse = sparse_ops._add_many_sparse_to_tensors_map
55_restore_sparse = sparse_ops._take_many_sparse_from_tensors_map
56# pylint: enable=protected-access
57
58
59@tf_export(
60    "io.match_filenames_once",
61    v1=["io.match_filenames_once", "train.match_filenames_once"])
62@deprecation.deprecated_endpoints("train.match_filenames_once")
63def match_filenames_once(pattern, name=None):
64  """Save the list of files matching pattern, so it is only computed once.
65
66  NOTE: The order of the files returned can be non-deterministic.
67
68  Args:
69    pattern: A file pattern (glob), or 1D tensor of file patterns.
70    name: A name for the operations (optional).
71
72  Returns:
73    A variable that is initialized to the list of files matching the pattern(s).
74  """
75  with ops.name_scope(name, "matching_filenames", [pattern]) as name:
76    return vs.variable(
77        name=name, initial_value=io_ops.matching_files(pattern),
78        trainable=False, validate_shape=False,
79        collections=[ops.GraphKeys.LOCAL_VARIABLES])
80
81
82@tf_export(v1=["train.limit_epochs"])
83@deprecation.deprecated(
84    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
85    "`tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.")
86def limit_epochs(tensor, num_epochs=None, name=None):
87  """Returns tensor `num_epochs` times and then raises an `OutOfRange` error.
88
89  Note: creates local counter `epochs`. Use `local_variables_initializer()` to
90  initialize local variables.
91
92  Args:
93    tensor: Any `Tensor`.
94    num_epochs: A positive integer (optional).  If specified, limits the number
95      of steps the output tensor may be evaluated.
96    name: A name for the operations (optional).
97
98  Returns:
99    tensor or `OutOfRange`.
100
101  Raises:
102    ValueError: if `num_epochs` is invalid.
103  """
104  if num_epochs is None:
105    return tensor
106  if num_epochs <= 0:
107    raise ValueError("num_epochs must be > 0 not %d." % num_epochs)
108  with ops.name_scope(name, "limit_epochs", [tensor]) as name:
109    zero64 = constant_op.constant(0, dtype=dtypes.int64)
110    epochs = vs.variable(
111        zero64, name="epochs", trainable=False,
112        collections=[ops.GraphKeys.LOCAL_VARIABLES])
113    counter = epochs.count_up_to(num_epochs)
114    with ops.control_dependencies([counter]):
115      return array_ops.identity(tensor, name=name)
116
117
118@tf_export(v1=["train.input_producer"])
119@deprecation.deprecated(
120    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
121    "`tf.data.Dataset.from_tensor_slices(input_tensor).shuffle"
122    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
123    "`shuffle=False`, omit the `.shuffle(...)`.")
124def input_producer(input_tensor,
125                   element_shape=None,
126                   num_epochs=None,
127                   shuffle=True,
128                   seed=None,
129                   capacity=32,
130                   shared_name=None,
131                   summary_name=None,
132                   name=None,
133                   cancel_op=None):
134  """Output the rows of `input_tensor` to a queue for an input pipeline.
135
136  Note: if `num_epochs` is not `None`, this function creates local counter
137  `epochs`. Use `local_variables_initializer()` to initialize local variables.
138
139  Args:
140    input_tensor: A tensor with the rows to produce. Must be at least
141      one-dimensional. Must either have a fully-defined shape, or
142      `element_shape` must be defined.
143    element_shape: (Optional.) A `TensorShape` representing the shape of a
144      row of `input_tensor`, if it cannot be inferred.
145    num_epochs: (Optional.) An integer. If specified `input_producer` produces
146      each row of `input_tensor` `num_epochs` times before generating an
147      `OutOfRange` error. If not specified, `input_producer` can cycle through
148      the rows of `input_tensor` an unlimited number of times.
149    shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled
150      within each epoch.
151    seed: (Optional.) An integer. The seed to use if `shuffle` is true.
152    capacity: (Optional.) The capacity of the queue to be used for buffering
153      the input.
154    shared_name: (Optional.) If set, this queue will be shared under the given
155      name across multiple sessions.
156    summary_name: (Optional.) If set, a scalar summary for the current queue
157      size will be generated, using this name as part of the tag.
158    name: (Optional.) A name for queue.
159    cancel_op: (Optional.) Cancel op for the queue
160
161  Returns:
162    A queue with the output rows.  A `QueueRunner` for the queue is
163    added to the current `QUEUE_RUNNER` collection of the current
164    graph.
165
166  Raises:
167    ValueError: If the shape of the input cannot be inferred from the arguments.
168    RuntimeError: If called with eager execution enabled.
169
170  @compatibility(eager)
171  Input pipelines based on Queues are not supported when eager execution is
172  enabled. Please use the `tf.data` API to ingest data under eager execution.
173  @end_compatibility
174  """
175  if context.executing_eagerly():
176    raise RuntimeError(
177        "Input pipelines based on Queues are not supported when eager execution"
178        " is enabled. Please use tf.data to ingest data into your model"
179        " instead.")
180  with ops.name_scope(name, "input_producer", [input_tensor]):
181    input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor")
182    element_shape = input_tensor.shape[1:].merge_with(element_shape)
183    if not element_shape.is_fully_defined():
184      raise ValueError("Either `input_tensor` must have a fully defined shape "
185                       "or `element_shape` must be specified")
186
187    if shuffle:
188      input_tensor = random_ops.random_shuffle(input_tensor, seed=seed)
189
190    input_tensor = limit_epochs(input_tensor, num_epochs)
191
192    q = data_flow_ops.FIFOQueue(capacity=capacity,
193                                dtypes=[input_tensor.dtype.base_dtype],
194                                shapes=[element_shape],
195                                shared_name=shared_name, name=name)
196    enq = q.enqueue_many([input_tensor])
197    queue_runner.add_queue_runner(
198        queue_runner.QueueRunner(
199            q, [enq], cancel_op=cancel_op))
200    if summary_name is not None:
201      summary.scalar(summary_name,
202                     math_ops.cast(q.size(), dtypes.float32) * (1. / capacity))
203    return q
204
205
206@tf_export(v1=["train.string_input_producer"])
207@deprecation.deprecated(
208    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
209    "`tf.data.Dataset.from_tensor_slices(string_tensor).shuffle"
210    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
211    "`shuffle=False`, omit the `.shuffle(...)`.")
212def string_input_producer(string_tensor,
213                          num_epochs=None,
214                          shuffle=True,
215                          seed=None,
216                          capacity=32,
217                          shared_name=None,
218                          name=None,
219                          cancel_op=None):
220  """Output strings (e.g. filenames) to a queue for an input pipeline.
221
222  Note: if `num_epochs` is not `None`, this function creates local counter
223  `epochs`. Use `local_variables_initializer()` to initialize local variables.
224
225  Args:
226    string_tensor: A 1-D string tensor with the strings to produce.
227    num_epochs: An integer (optional). If specified, `string_input_producer`
228      produces each string from `string_tensor` `num_epochs` times before
229      generating an `OutOfRange` error. If not specified,
230      `string_input_producer` can cycle through the strings in `string_tensor`
231      an unlimited number of times.
232    shuffle: Boolean. If true, the strings are randomly shuffled within each
233      epoch.
234    seed: An integer (optional). Seed used if shuffle == True.
235    capacity: An integer. Sets the queue capacity.
236    shared_name: (optional). If set, this queue will be shared under the given
237      name across multiple sessions. All sessions open to the device which has
238      this queue will be able to access it via the shared_name. Using this in
239      a distributed setting means each name will only be seen by one of the
240      sessions which has access to this operation.
241    name: A name for the operations (optional).
242    cancel_op: Cancel op for the queue (optional).
243
244  Returns:
245    A queue with the output strings.  A `QueueRunner` for the Queue
246    is added to the current `Graph`'s `QUEUE_RUNNER` collection.
247
248  Raises:
249    ValueError: If the string_tensor is a null Python list.  At runtime,
250    will fail with an assertion if string_tensor becomes a null tensor.
251
252  @compatibility(eager)
253  Input pipelines based on Queues are not supported when eager execution is
254  enabled. Please use the `tf.data` API to ingest data under eager execution.
255  @end_compatibility
256  """
257  not_null_err = "string_input_producer requires a non-null input tensor"
258  if not isinstance(string_tensor, ops.Tensor) and not string_tensor:
259    raise ValueError(not_null_err)
260
261  with ops.name_scope(name, "input_producer", [string_tensor]) as name:
262    string_tensor = ops.convert_to_tensor(string_tensor, dtype=dtypes.string)
263    with ops.control_dependencies([
264        control_flow_ops.Assert(
265            math_ops.greater(array_ops.size(string_tensor), 0),
266            [not_null_err])]):
267      string_tensor = array_ops.identity(string_tensor)
268    return input_producer(
269        input_tensor=string_tensor,
270        element_shape=[],
271        num_epochs=num_epochs,
272        shuffle=shuffle,
273        seed=seed,
274        capacity=capacity,
275        shared_name=shared_name,
276        name=name,
277        summary_name="fraction_of_%d_full" % capacity,
278        cancel_op=cancel_op)
279
280
281@tf_export(v1=["train.range_input_producer"])
282@deprecation.deprecated(
283    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
284    "`tf.data.Dataset.range(limit).shuffle(limit).repeat(num_epochs)`. If "
285    "`shuffle=False`, omit the `.shuffle(...)`.")
286def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None,
287                         capacity=32, shared_name=None, name=None):
288  """Produces the integers from 0 to limit-1 in a queue.
289
290  Note: if `num_epochs` is not `None`, this function creates local counter
291  `epochs`. Use `local_variables_initializer()` to initialize local variables.
292
293  Args:
294    limit: An int32 scalar tensor.
295    num_epochs: An integer (optional). If specified, `range_input_producer`
296      produces each integer `num_epochs` times before generating an
297      OutOfRange error. If not specified, `range_input_producer` can cycle
298      through the integers an unlimited number of times.
299    shuffle: Boolean. If true, the integers are randomly shuffled within each
300      epoch.
301    seed: An integer (optional). Seed used if shuffle == True.
302    capacity: An integer. Sets the queue capacity.
303    shared_name: (optional). If set, this queue will be shared under the given
304      name across multiple sessions.
305    name: A name for the operations (optional).
306
307  Returns:
308    A Queue with the output integers.  A `QueueRunner` for the Queue
309    is added to the current `Graph`'s `QUEUE_RUNNER` collection.
310
311  @compatibility(eager)
312  Input pipelines based on Queues are not supported when eager execution is
313  enabled. Please use the `tf.data` API to ingest data under eager execution.
314  @end_compatibility
315  """
316  with ops.name_scope(name, "input_producer", [limit]) as name:
317    range_tensor = math_ops.range(limit)
318    return input_producer(
319        range_tensor, [], num_epochs, shuffle, seed, capacity,
320        shared_name, "fraction_of_%d_full" % capacity, name)
321
322
323@tf_export(v1=["train.slice_input_producer"])
324@deprecation.deprecated(
325    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
326    "`tf.data.Dataset.from_tensor_slices(tuple(tensor_list)).shuffle"
327    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
328    "`shuffle=False`, omit the `.shuffle(...)`.")
329def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
330                         capacity=32, shared_name=None, name=None):
331  """Produces a slice of each `Tensor` in `tensor_list`.
332
333  Implemented using a Queue -- a `QueueRunner` for the Queue
334  is added to the current `Graph`'s `QUEUE_RUNNER` collection.
335
336  Args:
337    tensor_list: A list of `Tensor` objects. Every `Tensor` in
338      `tensor_list` must have the same size in the first dimension.
339    num_epochs: An integer (optional). If specified, `slice_input_producer`
340      produces each slice `num_epochs` times before generating
341      an `OutOfRange` error. If not specified, `slice_input_producer` can cycle
342      through the slices an unlimited number of times.
343    shuffle: Boolean. If true, the integers are randomly shuffled within each
344      epoch.
345    seed: An integer (optional). Seed used if shuffle == True.
346    capacity: An integer. Sets the queue capacity.
347    shared_name: (optional). If set, this queue will be shared under the given
348      name across multiple sessions.
349    name: A name for the operations (optional).
350
351  Returns:
352    A list of tensors, one for each element of `tensor_list`.  If the tensor
353    in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output
354    tensor will have shape `[a, b, ..., z]`.
355
356  Raises:
357    ValueError: if `slice_input_producer` produces nothing from `tensor_list`.
358
359  @compatibility(eager)
360  Input pipelines based on Queues are not supported when eager execution is
361  enabled. Please use the `tf.data` API to ingest data under eager execution.
362  @end_compatibility
363  """
364  with ops.name_scope(name, "input_producer", tensor_list):
365    tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
366    if not tensor_list:
367      raise ValueError(
368          "Expected at least one tensor in slice_input_producer().")
369    range_size = array_ops.shape(tensor_list[0])[0]
370    # TODO(josh11b): Add an assertion that the first dimension of
371    # everything in TensorList matches. Maybe just check the inferred shapes?
372    queue = range_input_producer(range_size, num_epochs=num_epochs,
373                                 shuffle=shuffle, seed=seed, capacity=capacity,
374                                 shared_name=shared_name)
375    index = queue.dequeue()
376    output = [array_ops.gather(t, index) for t in tensor_list]
377    return output
378
379
380# Helpers for the batching functions ------------------------------------------
381
382
383def _flatten(tensor_list_list):
384  return [tensor for tensor_list in tensor_list_list for tensor in tensor_list]
385
386
387class _SparseMetaData(object):
388  """Store information about the Tensor: Is it sparse?, map_op, and rank."""
389
390  def __init__(self, sparse, map_op, rank):
391    """Create the metadata.
392
393    Args:
394      sparse: Python boolean.
395      map_op: The `Operation` that created the `SparseTensorsMap` in question.
396        This Op contains information about the underlying Map object and the
397        dtype of the original data.
398      rank: The statically known rank of the `SparseTensor`.
399    """
400    self._sparse = sparse
401    self._map_op = map_op
402    self._rank = tensor_shape.Dimension(rank)
403
404  def __eq__(self, other):
405    if self.sparse != other.sparse:
406      return False
407    if not self.sparse:
408      return True
409    # If map_ops are not the same, the data source is not the same.
410    if (self.map_op is not None) != (other.map_op is not None):
411      return False
412    if self.map_op != other.map_op:
413      return False
414    if not self.rank.is_compatible_with(other.rank):
415      return False
416    return True
417
418  def __ne__(self, other):
419    return not self.__eq__(other)
420
421  def __str__(self):
422    return "[SparseMetaData(%s, %s, %s)]" % (self.sparse, self.map_op.name,
423                                             self.rank)
424
425  def merge_with(self, other):
426    if self != other:
427      raise ValueError("SparseMetaData objects are incompatible: %s vs. %s"
428                       % (self, other))
429    if self.sparse:
430      self.rank.merge_with(other.rank)
431    return self
432
433  @property
434  def map_op(self):
435    return self._map_op
436
437  @property
438  def sparse(self):
439    return self._sparse
440
441  @property
442  def rank(self):
443    return self._rank
444
445
446def _as_tensor_list(tensors):
447  if isinstance(tensors, dict):
448    return [tensors[k] for k in sorted(tensors, key=str)]
449  else:
450    return tensors
451
452
453def _as_tensor_list_list(tensors_list):
454  if not tensors_list:
455    raise ValueError("Expected at least one set of tensors")
456  if isinstance(tensors_list[0], dict):
457    expected_keys = set(tensors_list[0].keys())
458    for tensors in tensors_list[1:]:
459      if set(tensors.keys()) != expected_keys:
460        raise ValueError("All dictionaries in tensors_list must have "
461                         "the same keys")
462    return [_as_tensor_list(tensors) for tensors in tensors_list]
463  else:
464    return tensors_list
465
466
467def _as_original_type(original_tensors, tensor_list):
468  if isinstance(original_tensors, dict):
469    if len(original_tensors) == 1:
470      # tensor_list is bogusly returned as a single tensor if only one tensor
471      # was enqueued.  Make it a list again.  See b/28117485.
472      tensor_list = [tensor_list]
473    return {k: tensor_list[i]
474            for i, k in enumerate(sorted(original_tensors, key=str))}
475  else:
476    return tensor_list
477
478
479def _store_sparse_tensors(tensor_list, enqueue_many, keep_input,
480                          shared_map_ops=None):
481  """Store SparseTensors for feeding into batch, etc.
482
483  If `shared_map_ops` is provided, the underlying `SparseTensorsMap` objects
484  are reused (shared).  This argument is useful for, e.g., `batch_join`
485  where multiple enqueue operations write to the same Queue component,
486  and another (dequeue) thread reads from that same location and must then
487  restore the associated `SparseTensor` objects.  In this case, the sparse
488  restore must have a single `SparseTensorMap` from which to read out the
489  handles; so a single `SparseTensorMap` must be shared for storing
490  across the multiple enqueue operations.  This sharing is performed by
491  calling `_store_sparse_tensors` the first time with `shared_map_ops=None`,
492  and then in subsequent times with this value set to the list of `Operation`
493  objects created in the first call.
494
495  Args:
496    tensor_list: List of `Tensor` and `SparseTensor` objects.
497    enqueue_many: Python `Boolean`.
498    keep_input: Must be a scalar bool Tensor (not a Python bool). If False,
499      don't store.
500    shared_map_ops: (optional) List of `Operation` objects from a previous
501      call to `_store_sparse_tensors`.  If not `None`, the op types should be
502      one of `AddSparseToTensorsMap` or `AddManySparseToTensorsMap` in the
503      locations corresponding to `SparseTensors` in `tensor_list`.
504
505  Returns:
506    A tuple `(stored_list, sparse_info_list)` where `stored_list` is a list
507    of `Tensor` objects (same length as `tensor_list`) and `sparse_info_list`
508    is a list of the same length of `_SparseMetaData` objects.
509  """
510  maybe_shared_map_ops = shared_map_ops or [None] * len(tensor_list)
511
512  def _sparse_meta_data(t, storing_op, map_op):
513    if not isinstance(t, sparse_tensor.SparseTensor):
514      return _SparseMetaData(False, None, None)
515    rank = t.dense_shape.shape.with_rank(1).dims[0]
516    if enqueue_many:
517      rank -= 1
518    # If a shared map_op was provided, use that. Otherwise use the name of
519    # the operation used to store the SparseTensor.
520    return _SparseMetaData(
521        sparse=True, map_op=map_op or storing_op, rank=rank)
522
523  def _maybe_store(t, shared_map_op):
524    """Store Sparse tensor, if necessary."""
525    if not isinstance(t, sparse_tensor.SparseTensor):
526      return t
527    map_op_name = shared_map_op.name if shared_map_op else None
528    def _maybe_store_sparse(t, map_op_name, keep_input):
529      """Conditionally store a single sparse Tensor."""
530      return utils.smart_cond(
531          keep_input,
532          lambda: _store_sparse(t, shared_name=map_op_name),
533          lambda: constant_op.constant(-1, dtypes.int64))
534    def _maybe_store_many_sparse(t, map_op_name, keep_input):
535      """Conditionally store multiple sparse Tensors."""
536      out_tensor = utils.smart_cond(
537          keep_input,
538          lambda: _store_many_sparse(t, shared_name=map_op_name),
539          lambda: -1 * array_ops.ones(array_ops.shape(t)[0:1], dtypes.int64))
540      out_tensor.set_shape([None])  # necessary when t.ndims is unknown
541      return out_tensor
542    def _sparse_values_to_keep(t, keep_input):
543      """Convert a per-row `keep_input` vector to a per-value one."""
544      # Get the rows of every value in the sparse Tensor.
545      row_values = t.indices[:, 0]
546      # The value should be kept iff the row should be kept.
547      return array_ops.gather(keep_input, row_values)
548    if keep_input.shape.ndims == 1:
549      t = sparse_ops.sparse_retain(t, _sparse_values_to_keep(t, keep_input))
550      store_f = lambda t, name, _: _store_many_sparse(t, shared_name=name)
551    elif enqueue_many:
552      store_f = _maybe_store_many_sparse
553    else:
554      store_f = _maybe_store_sparse
555    return store_f(t, map_op_name, keep_input)
556
557  stored_list = [
558      _maybe_store(t, shared_map_op) for t, shared_map_op
559      in zip(tensor_list, maybe_shared_map_ops)]
560  # Since the output of `_store{_many}_sparse is wrapped in a tf.cond `Merge`,
561  # we can't just get the Op of the resulting tensor.
562  def _sparse_op(stored):
563    for input_tensor in stored.op.inputs:
564      if input_tensor.op.type in ("AddSparseToTensorsMap",
565                                  "AddManySparseToTensorsMap"):
566        return input_tensor.op
567    # If there was no sparse input, then the original stored Tensor wasn't
568    # sparse and we can just return the original Tensor's Op.
569    return stored.op
570  sparse_info_list = [
571      _sparse_meta_data(t, _sparse_op(stored), shared_map_op)
572      for t, stored, shared_map_op
573      in zip(tensor_list, stored_list, maybe_shared_map_ops)]
574  # Expand dims of stored tensors by 1 for proper enqueue shape
575  stored_list = [
576      array_ops.expand_dims(s, [-1]) if s_info.sparse else s
577      for s, s_info in zip(stored_list, sparse_info_list)]
578  return stored_list, sparse_info_list
579
580
581def _store_sparse_tensors_join(tensor_list_list, enqueue_many, keep_input):
582  """Store SparseTensors for feeding into batch_join, etc."""
583  (s0, sparse_info_list) = _store_sparse_tensors(
584      tensor_list_list[0], enqueue_many, keep_input)
585  stored_list_list = [s0]
586  for tensor_list in tensor_list_list[1:]:
587    s, sparse_info_candidate = _store_sparse_tensors(
588        tensor_list, enqueue_many, keep_input,
589        [st.map_op for st in sparse_info_list])
590    if sparse_info_list != sparse_info_candidate:
591      raise ValueError("Inconsistent SparseTensors list: %s vs. %s"
592                       % (tensor_list_list[0], tensor_list))
593    sparse_info_list = [
594        info.merge_with(candidate)
595        for (info, candidate) in zip(sparse_info_list, sparse_info_candidate)]
596    stored_list_list.append(s)
597
598  return (stored_list_list, sparse_info_list)
599
600
601def _restore_sparse_tensors(stored_list, sparse_info_list):
602  """Restore SparseTensors after dequeue in batch, batch_join, etc."""
603  received_sequence = isinstance(stored_list, collections.Sequence)
604  if not received_sequence:
605    stored_list = (stored_list,)
606  tensors = [
607      _restore_sparse(sparse_map_op=info.map_op,
608                      sparse_handles=array_ops.squeeze(s, [1]),
609                      rank=tensor_shape.dimension_value(info.rank + 1))
610      if info.sparse else s
611      for (s, info) in zip(stored_list, sparse_info_list)]
612  has_st = any(isinstance(x, sparse_tensor.SparseTensor) for x in tensors)
613  if has_st:
614    t_values = [
615        x.values if isinstance(x, sparse_tensor.SparseTensor)
616        else x
617        for x in tensors]
618    with_deps = lambda x: control_flow_ops.with_dependencies(t_values, x)
619    ensure_restore_tensors = [
620        sparse_tensor.SparseTensor(indices=with_deps(x.indices),
621                                   values=with_deps(x.values),
622                                   dense_shape=with_deps(x.dense_shape))
623        if isinstance(x, sparse_tensor.SparseTensor)
624        else with_deps(x)
625        for x in tensors]
626  else:
627    ensure_restore_tensors = tensors
628  return ensure_restore_tensors if received_sequence else tensors[0]
629
630
631def _validate(tensor_list):
632  tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
633  if not tensor_list:
634    raise ValueError("Expected at least one tensor in batch().")
635  return tensor_list
636
637
638def _validate_join(tensor_list_list):
639  tensor_list_list = [ops.convert_n_to_tensor_or_indexed_slices(tl)
640                      for tl in tensor_list_list]
641  if not tensor_list_list:
642    raise ValueError("Expected at least one input in batch_join().")
643  return tensor_list_list
644
645
646def _validate_keep_input(keep_input, enqueue_many):
647  """Validate `keep_input` argument to conditional batching functions."""
648  keep_input = ops.convert_to_tensor(keep_input)
649  if keep_input.shape.ndims is None:
650    raise ValueError(
651        "`keep_input` dimensions must be known at graph construction.")
652  if not enqueue_many and keep_input.shape.ndims == 1:
653    raise ValueError(
654        "`keep_input` cannot be a vector when `enqueue_many=False`.")
655  if keep_input.shape.ndims > 1:
656    raise ValueError("`keep_input` must be 0 or 1 dimensions.")
657  return keep_input
658
659
660def _dtypes(tensor_list_list):
661  all_types = [[t.dtype for t in tl] for tl in tensor_list_list]
662  types = all_types[0]
663  for other_types in all_types[1:]:
664    if other_types != types:
665      raise TypeError("Expected types to be consistent: %s vs. %s." %
666                      (", ".join(x.name for x in types),
667                       ", ".join(x.name for x in other_types)))
668  return types
669
670
671def _merge_shapes(shape_list, enqueue_many):
672  shape_list = [tensor_shape.as_shape(s) for s in shape_list]
673  if enqueue_many:
674    # We want the shapes without the leading batch dimension.
675    shape_list = [s.with_rank_at_least(1)[1:] for s in shape_list]
676  merged_shape = shape_list[0]
677  for s in shape_list[1:]:
678    merged_shape.merge_with(s)
679  return merged_shape.as_list()
680
681
682def _shapes(tensor_list_list, shapes, enqueue_many):
683  """Calculate and merge the shapes of incoming tensors.
684
685  Args:
686    tensor_list_list: List of tensor lists.
687    shapes: List of shape tuples corresponding to tensors within the lists.
688    enqueue_many: Boolean describing whether shapes will be enqueued as
689      batches or individual entries.
690
691  Returns:
692    A list of shapes aggregating shape inference info from `tensor_list_list`,
693    or returning `shapes` if it is not `None`.
694
695  Raises:
696    ValueError: If any of the inferred shapes in `tensor_list_list` lack a
697      well defined rank.
698  """
699  if shapes is None:
700    len0 = len(tensor_list_list[0])
701
702    for tl in tensor_list_list:
703      for i in xrange(len0):
704        if tl[i].shape.ndims is None:
705          raise ValueError("Cannot infer Tensor's rank: %s" % tl[i])
706
707    shapes = [_merge_shapes(
708        [tl[i].shape.as_list() for tl in tensor_list_list], enqueue_many)
709              for i in xrange(len0)]
710  return shapes
711
712
713def _select_which_to_enqueue(tensor_list, keep_input):
714  """Select which examples to enqueue based on vector `keep_input`."""
715  select_i = math_ops.cast(keep_input, dtypes.int32)
716  tensor_list = [
717      data_flow_ops.dynamic_partition(x, select_i, num_partitions=2)[1]
718      for x in tensor_list]
719  return tensor_list
720
721
722def _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input):
723  """Enqueue `tensor_list_list` in `queue`."""
724  if enqueue_many:
725    enqueue_fn = queue.enqueue_many
726  else:
727    enqueue_fn = queue.enqueue
728  if keep_input.shape.ndims == 1:
729    enqueue_ops = [enqueue_fn(_select_which_to_enqueue(x, keep_input))
730                   for x in tensor_list_list]
731  else:
732    enqueue_ops = [utils.smart_cond(
733        keep_input,
734        lambda: enqueue_fn(tl),  # pylint:disable=cell-var-from-loop
735        control_flow_ops.no_op) for tl in tensor_list_list]
736  queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
737
738
739def _enqueue(queue, tensor_list, threads, enqueue_many, keep_input):
740  """Enqueue `tensor_list` in `queue`."""
741  if enqueue_many:
742    enqueue_fn = queue.enqueue_many
743  else:
744    enqueue_fn = queue.enqueue
745  if keep_input.shape.ndims == 1:
746    enqueue_ops = [
747        enqueue_fn(_select_which_to_enqueue(tensor_list, keep_input))] * threads
748  else:
749    enqueue_ops = [utils.smart_cond(
750        keep_input,
751        lambda: enqueue_fn(tensor_list),
752        control_flow_ops.no_op)] * threads
753  queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
754
755
756def _which_queue(dynamic_pad):
757  return (data_flow_ops.PaddingFIFOQueue if dynamic_pad
758          else data_flow_ops.FIFOQueue)
759
760
761def _batch(tensors, batch_size, keep_input, num_threads=1, capacity=32,
762           enqueue_many=False, shapes=None, dynamic_pad=False,
763           allow_smaller_final_batch=False, shared_name=None,
764           name=None):
765  """Helper function for `batch` and `maybe_batch`."""
766  if context.executing_eagerly():
767    raise ValueError(
768        "Input pipelines based on Queues are not supported when eager execution"
769        " is enabled. Please use tf.data to ingest data into your model"
770        " instead.")
771  tensor_list = _as_tensor_list(tensors)
772  with ops.name_scope(name, "batch", list(tensor_list) + [keep_input]) as name:
773    tensor_list = _validate(tensor_list)
774    keep_input = _validate_keep_input(keep_input, enqueue_many)
775    (tensor_list, sparse_info) = _store_sparse_tensors(
776        tensor_list, enqueue_many, keep_input)
777    types = _dtypes([tensor_list])
778    shapes = _shapes([tensor_list], shapes, enqueue_many)
779    # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
780    queue = _which_queue(dynamic_pad)(
781        capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
782    _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input)
783    summary.scalar(
784        "fraction_of_%d_full" % capacity,
785        math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity))
786
787    if allow_smaller_final_batch:
788      dequeued = queue.dequeue_up_to(batch_size, name=name)
789    else:
790      dequeued = queue.dequeue_many(batch_size, name=name)
791    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
792    return _as_original_type(tensors, dequeued)
793
794
795# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be
796# a multiple of len(tensor_list_list)?) parameter, to address the use
797# case where you want more parallelism than you can support different
798# readers (either because you don't have that many files or can't
799# read that many files in parallel due to the number of seeks required).
800# Once this is done, batch() can be written as a call to batch_join().
801def _batch_join(tensors_list, batch_size, keep_input, capacity=32,
802                enqueue_many=False, shapes=None, dynamic_pad=False,
803                allow_smaller_final_batch=False, shared_name=None, name=None):
804  """Helper function for `batch_join` and `maybe_batch_join`."""
805  if context.executing_eagerly():
806    raise ValueError(
807        "Input pipelines based on Queues are not supported when eager execution"
808        " is enabled. Please use tf.data to ingest data into your model"
809        " instead.")
810  tensor_list_list = _as_tensor_list_list(tensors_list)
811  with ops.name_scope(name, "batch_join",
812                      _flatten(tensor_list_list) + [keep_input]) as name:
813    tensor_list_list = _validate_join(tensor_list_list)
814    keep_input = _validate_keep_input(keep_input, enqueue_many)
815    tensor_list_list, sparse_info = _store_sparse_tensors_join(
816        tensor_list_list, enqueue_many, keep_input)
817    types = _dtypes(tensor_list_list)
818    shapes = _shapes(tensor_list_list, shapes, enqueue_many)
819    # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
820    queue = _which_queue(dynamic_pad)(
821        capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
822    _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input)
823    summary.scalar(
824        "fraction_of_%d_full" % capacity,
825        math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity))
826
827    if allow_smaller_final_batch:
828      dequeued = queue.dequeue_up_to(batch_size, name=name)
829    else:
830      dequeued = queue.dequeue_many(batch_size, name=name)
831    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
832    # tensors_list was validated to not be empty.
833    return _as_original_type(tensors_list[0], dequeued)
834
835
836def _shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
837                   keep_input, num_threads=1, seed=None, enqueue_many=False,
838                   shapes=None, allow_smaller_final_batch=False,
839                   shared_name=None, name=None):
840  """Helper function for `shuffle_batch` and `maybe_shuffle_batch`."""
841  if context.executing_eagerly():
842    raise ValueError(
843        "Input pipelines based on Queues are not supported when eager execution"
844        " is enabled. Please use tf.data to ingest data into your model"
845        " instead.")
846  tensor_list = _as_tensor_list(tensors)
847  with ops.name_scope(name, "shuffle_batch",
848                      list(tensor_list) + [keep_input]) as name:
849    if capacity <= min_after_dequeue:
850      raise ValueError("capacity %d must be bigger than min_after_dequeue %d."
851                       % (capacity, min_after_dequeue))
852    tensor_list = _validate(tensor_list)
853    keep_input = _validate_keep_input(keep_input, enqueue_many)
854    tensor_list, sparse_info = _store_sparse_tensors(
855        tensor_list, enqueue_many, keep_input)
856    types = _dtypes([tensor_list])
857    shapes = _shapes([tensor_list], shapes, enqueue_many)
858    queue = data_flow_ops.RandomShuffleQueue(
859        capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
860        dtypes=types, shapes=shapes, shared_name=shared_name)
861    _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input)
862    full = (math_ops.cast(
863        math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) *
864            (1. / (capacity - min_after_dequeue)))
865    # Note that name contains a '/' at the end so we intentionally do not place
866    # a '/' after %s below.
867    summary_name = (
868        "fraction_over_%d_of_%d_full" %
869        (min_after_dequeue, capacity - min_after_dequeue))
870    summary.scalar(summary_name, full)
871
872    if allow_smaller_final_batch:
873      dequeued = queue.dequeue_up_to(batch_size, name=name)
874    else:
875      dequeued = queue.dequeue_many(batch_size, name=name)
876    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
877    return _as_original_type(tensors, dequeued)
878
879
880def _shuffle_batch_join(tensors_list, batch_size, capacity,
881                        min_after_dequeue, keep_input, seed=None,
882                        enqueue_many=False, shapes=None,
883                        allow_smaller_final_batch=False, shared_name=None,
884                        name=None):
885  """Helper function for `shuffle_batch_join` and `maybe_shuffle_batch_join`."""
886  if context.executing_eagerly():
887    raise ValueError(
888        "Input pipelines based on Queues are not supported when eager execution"
889        " is enabled. Please use tf.data to ingest data into your model"
890        " instead.")
891  tensor_list_list = _as_tensor_list_list(tensors_list)
892  with ops.name_scope(name, "shuffle_batch_join",
893                      _flatten(tensor_list_list) + [keep_input]) as name:
894    tensor_list_list = _validate_join(tensor_list_list)
895    keep_input = _validate_keep_input(keep_input, enqueue_many)
896    tensor_list_list, sparse_info = _store_sparse_tensors_join(
897        tensor_list_list, enqueue_many, keep_input)
898    types = _dtypes(tensor_list_list)
899    shapes = _shapes(tensor_list_list, shapes, enqueue_many)
900    queue = data_flow_ops.RandomShuffleQueue(
901        capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
902        dtypes=types, shapes=shapes, shared_name=shared_name)
903    _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input)
904    full = (math_ops.cast(
905        math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) *
906            (1. / (capacity - min_after_dequeue)))
907    # Note that name contains a '/' at the end so we intentionally do not place
908    # a '/' after %s below.
909    summary_name = (
910        "fraction_over_%d_of_%d_full" %
911        (min_after_dequeue, capacity - min_after_dequeue))
912    summary.scalar(summary_name, full)
913
914    if allow_smaller_final_batch:
915      dequeued = queue.dequeue_up_to(batch_size, name=name)
916    else:
917      dequeued = queue.dequeue_many(batch_size, name=name)
918    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
919    # tensors_list was validated to not be empty.
920    return _as_original_type(tensors_list[0], dequeued)
921
922# Batching functions ----------------------------------------------------------
923
924
925@tf_export(v1=["train.batch"])
926@deprecation.deprecated(
927    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
928    "`tf.data.Dataset.batch(batch_size)` (or `padded_batch(...)` if "
929    "`dynamic_pad=True`).")
930def batch(tensors, batch_size, num_threads=1, capacity=32,
931          enqueue_many=False, shapes=None, dynamic_pad=False,
932          allow_smaller_final_batch=False, shared_name=None, name=None):
933  """Creates batches of tensors in `tensors`.
934
935  The argument `tensors` can be a list or a dictionary of tensors.
936  The value returned by the function will be of the same type
937  as `tensors`.
938
939  This function is implemented using a queue. A `QueueRunner` for the
940  queue is added to the current `Graph`'s `QUEUE_RUNNER` collection.
941
942  If `enqueue_many` is `False`, `tensors` is assumed to represent a single
943  example.  An input tensor with shape `[x, y, z]` will be output as a tensor
944  with shape `[batch_size, x, y, z]`.
945
946  If `enqueue_many` is `True`, `tensors` is assumed to represent a batch of
947  examples, where the first dimension is indexed by example, and all members of
948  `tensors` should have the same size in the first dimension.  If an input
949  tensor has shape `[*, x, y, z]`, the output will have shape `[batch_size, x,
950  y, z]`.  The `capacity` argument controls the how long the prefetching is
951  allowed to grow the queues.
952
953  The returned operation is a dequeue operation and will throw
954  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
955  operation is feeding another input queue, its queue runner will catch
956  this exception, however, if this operation is used in your main thread
957  you are responsible for catching this yourself.
958
959  *N.B.:* If `dynamic_pad` is `False`, you must ensure that either
960  (i) the `shapes` argument is passed, or (ii) all of the tensors in
961  `tensors` must have fully-defined shapes. `ValueError` will be
962  raised if neither of these conditions holds.
963
964  If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
965  tensors is known, but individual dimensions may have shape `None`.
966  In this case, for each enqueue the dimensions with value `None`
967  may have a variable length; upon dequeue, the output tensors will be padded
968  on the right to the maximum shape of the tensors in the current minibatch.
969  For numbers, this padding takes value 0.  For strings, this padding is
970  the empty string.  See `PaddingFIFOQueue` for more info.
971
972  If `allow_smaller_final_batch` is `True`, a smaller batch value than
973  `batch_size` is returned when the queue is closed and there are not enough
974  elements to fill the batch, otherwise the pending elements are discarded.
975  In addition, all output tensors' static shapes, as accessed via the
976  `shape` property will have a first `Dimension` value of `None`, and
977  operations that depend on fixed batch_size would fail.
978
979  Args:
980    tensors: The list or dictionary of tensors to enqueue.
981    batch_size: The new batch size pulled from the queue.
982    num_threads: The number of threads enqueuing `tensors`.  The batching will
983      be nondeterministic if `num_threads > 1`.
984    capacity: An integer. The maximum number of elements in the queue.
985    enqueue_many: Whether each tensor in `tensors` is a single example.
986    shapes: (Optional) The shapes for each example.  Defaults to the
987      inferred shapes for `tensors`.
988    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
989      The given dimensions are padded upon dequeue so that tensors within a
990      batch have the same shapes.
991    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
992      batch to be smaller if there are insufficient items left in the queue.
993    shared_name: (Optional). If set, this queue will be shared under the given
994      name across multiple sessions.
995    name: (Optional) A name for the operations.
996
997  Returns:
998    A list or dictionary of tensors with the same types as `tensors` (except if
999    the input is a list of one element, then it returns a tensor, not a list).
1000
1001  Raises:
1002    ValueError: If the `shapes` are not specified, and cannot be
1003      inferred from the elements of `tensors`.
1004
1005  @compatibility(eager)
1006  Input pipelines based on Queues are not supported when eager execution is
1007  enabled. Please use the `tf.data` API to ingest data under eager execution.
1008  @end_compatibility
1009  """
1010  return _batch(
1011      tensors,
1012      batch_size,
1013      keep_input=True,
1014      num_threads=num_threads,
1015      capacity=capacity,
1016      enqueue_many=enqueue_many,
1017      shapes=shapes,
1018      dynamic_pad=dynamic_pad,
1019      allow_smaller_final_batch=allow_smaller_final_batch,
1020      shared_name=shared_name,
1021      name=name)
1022
1023
1024@tf_export(v1=["train.maybe_batch"])
1025@deprecation.deprecated(
1026    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1027    "`tf.data.Dataset.filter(...).batch(batch_size)` (or `padded_batch(...)`"
1028    " if `dynamic_pad=True`).")
1029def maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32,
1030                enqueue_many=False, shapes=None, dynamic_pad=False,
1031                allow_smaller_final_batch=False, shared_name=None, name=None):
1032  """Conditionally creates batches of tensors based on `keep_input`.
1033
1034  See docstring in `batch` for more details.
1035
1036  Args:
1037    tensors: The list or dictionary of tensors to enqueue.
1038    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1039      added to the queue or not.  If it is a scalar and evaluates `True`, then
1040      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1041      is `True`, then each example is added to the queue only if the
1042      corresponding value in `keep_input` is `True`. This tensor essentially
1043      acts as a filtering mechanism.
1044    batch_size: The new batch size pulled from the queue.
1045    num_threads: The number of threads enqueuing `tensors`.  The batching will
1046      be nondeterministic if `num_threads > 1`.
1047    capacity: An integer. The maximum number of elements in the queue.
1048    enqueue_many: Whether each tensor in `tensors` is a single example.
1049    shapes: (Optional) The shapes for each example.  Defaults to the
1050      inferred shapes for `tensors`.
1051    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1052      The given dimensions are padded upon dequeue so that tensors within a
1053      batch have the same shapes.
1054    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1055      batch to be smaller if there are insufficient items left in the queue.
1056    shared_name: (Optional). If set, this queue will be shared under the given
1057      name across multiple sessions.
1058    name: (Optional) A name for the operations.
1059
1060  Returns:
1061    A list or dictionary of tensors with the same types as `tensors`.
1062
1063  Raises:
1064    ValueError: If the `shapes` are not specified, and cannot be
1065      inferred from the elements of `tensors`.
1066  """
1067  return _batch(
1068      tensors,
1069      batch_size,
1070      keep_input,
1071      num_threads=num_threads,
1072      capacity=capacity,
1073      enqueue_many=enqueue_many,
1074      shapes=shapes,
1075      dynamic_pad=dynamic_pad,
1076      allow_smaller_final_batch=allow_smaller_final_batch,
1077      shared_name=shared_name,
1078      name=name)
1079
1080
1081@tf_export(v1=["train.batch_join"])
1082@deprecation.deprecated(
1083    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1084    "`tf.data.Dataset.interleave(...).batch(batch_size)` (or "
1085    "`padded_batch(...)` if `dynamic_pad=True`).")
1086def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False,
1087               shapes=None, dynamic_pad=False, allow_smaller_final_batch=False,
1088               shared_name=None, name=None):
1089  """Runs a list of tensors to fill a queue to create batches of examples.
1090
1091  The `tensors_list` argument is a list of tuples of tensors, or a list of
1092  dictionaries of tensors.  Each element in the list is treated similarly
1093  to the `tensors` argument of `tf.train.batch()`.
1094
1095  WARNING: This function is nondeterministic, since it starts a separate thread
1096  for each tensor.
1097
1098  Enqueues a different list of tensors in different threads.
1099  Implemented using a queue -- a `QueueRunner` for the queue
1100  is added to the current `Graph`'s `QUEUE_RUNNER` collection.
1101
1102  `len(tensors_list)` threads will be started,
1103  with thread `i` enqueuing the tensors from
1104  `tensors_list[i]`. `tensors_list[i1][j]` must match
1105  `tensors_list[i2][j]` in type and shape, except in the first
1106  dimension if `enqueue_many` is true.
1107
1108  If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
1109  to represent a single example. An input tensor `x` will be output as a
1110  tensor with shape `[batch_size] + x.shape`.
1111
1112  If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
1113  represent a batch of examples, where the first dimension is indexed
1114  by example, and all members of `tensors_list[i]` should have the
1115  same size in the first dimension.  The slices of any input tensor
1116  `x` are treated as examples, and the output tensors will have shape
1117  `[batch_size] + x.shape[1:]`.
1118
1119  The `capacity` argument controls the how long the prefetching is allowed to
1120  grow the queues.
1121
1122  The returned operation is a dequeue operation and will throw
1123  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1124  operation is feeding another input queue, its queue runner will catch
1125  this exception, however, if this operation is used in your main thread
1126  you are responsible for catching this yourself.
1127
1128  *N.B.:* If `dynamic_pad` is `False`, you must ensure that either
1129  (i) the `shapes` argument is passed, or (ii) all of the tensors in
1130  `tensors_list` must have fully-defined shapes. `ValueError` will be
1131  raised if neither of these conditions holds.
1132
1133  If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
1134  tensors is known, but individual dimensions may have value `None`.
1135  In this case, for each enqueue the dimensions with value `None`
1136  may have a variable length; upon dequeue, the output tensors will be padded
1137  on the right to the maximum shape of the tensors in the current minibatch.
1138  For numbers, this padding takes value 0.  For strings, this padding is
1139  the empty string.  See `PaddingFIFOQueue` for more info.
1140
1141  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1142  `batch_size` is returned when the queue is closed and there are not enough
1143  elements to fill the batch, otherwise the pending elements are discarded.
1144  In addition, all output tensors' static shapes, as accessed via the
1145  `shape` property will have a first `Dimension` value of `None`, and
1146  operations that depend on fixed batch_size would fail.
1147
1148  Args:
1149    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1150    batch_size: An integer. The new batch size pulled from the queue.
1151    capacity: An integer. The maximum number of elements in the queue.
1152    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1153      example.
1154    shapes: (Optional) The shapes for each example.  Defaults to the
1155      inferred shapes for `tensor_list_list[i]`.
1156    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1157      The given dimensions are padded upon dequeue so that tensors within a
1158      batch have the same shapes.
1159    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1160      batch to be smaller if there are insufficient items left in the queue.
1161    shared_name: (Optional) If set, this queue will be shared under the given
1162      name across multiple sessions.
1163    name: (Optional) A name for the operations.
1164
1165  Returns:
1166    A list or dictionary of tensors with the same number and types as
1167    `tensors_list[i]`.
1168
1169  Raises:
1170    ValueError: If the `shapes` are not specified, and cannot be
1171      inferred from the elements of `tensor_list_list`.
1172
1173  @compatibility(eager)
1174  Input pipelines based on Queues are not supported when eager execution is
1175  enabled. Please use the `tf.data` API to ingest data under eager execution.
1176  @end_compatibility
1177  """
1178  return _batch_join(
1179      tensors_list,
1180      batch_size,
1181      keep_input=True,
1182      capacity=capacity,
1183      enqueue_many=enqueue_many,
1184      shapes=shapes,
1185      dynamic_pad=dynamic_pad,
1186      allow_smaller_final_batch=allow_smaller_final_batch,
1187      shared_name=shared_name,
1188      name=name)
1189
1190
1191@tf_export(v1=["train.maybe_batch_join"])
1192@deprecation.deprecated(
1193    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1194    "`tf.data.Dataset.interleave(...).filter(...).batch(batch_size)` (or "
1195    "`padded_batch(...)` if `dynamic_pad=True`).")
1196def maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32,
1197                     enqueue_many=False, shapes=None, dynamic_pad=False,
1198                     allow_smaller_final_batch=False, shared_name=None,
1199                     name=None):
1200  """Runs a list of tensors to conditionally fill a queue to create batches.
1201
1202  See docstring in `batch_join` for more details.
1203
1204  Args:
1205    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1206    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1207      added to the queue or not.  If it is a scalar and evaluates `True`, then
1208      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1209      is `True`, then each example is added to the queue only if the
1210      corresponding value in `keep_input` is `True`. This tensor essentially
1211      acts as a filtering mechanism.
1212    batch_size: An integer. The new batch size pulled from the queue.
1213    capacity: An integer. The maximum number of elements in the queue.
1214    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1215      example.
1216    shapes: (Optional) The shapes for each example.  Defaults to the
1217      inferred shapes for `tensor_list_list[i]`.
1218    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1219      The given dimensions are padded upon dequeue so that tensors within a
1220      batch have the same shapes.
1221    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1222      batch to be smaller if there are insufficient items left in the queue.
1223    shared_name: (Optional) If set, this queue will be shared under the given
1224      name across multiple sessions.
1225    name: (Optional) A name for the operations.
1226
1227  Returns:
1228    A list or dictionary of tensors with the same number and types as
1229    `tensors_list[i]`.
1230
1231  Raises:
1232    ValueError: If the `shapes` are not specified, and cannot be
1233      inferred from the elements of `tensor_list_list`.
1234  """
1235  return _batch_join(
1236      tensors_list,
1237      batch_size,
1238      keep_input,
1239      capacity=capacity,
1240      enqueue_many=enqueue_many,
1241      shapes=shapes,
1242      dynamic_pad=dynamic_pad,
1243      allow_smaller_final_batch=allow_smaller_final_batch,
1244      shared_name=shared_name,
1245      name=name)
1246
1247
1248@tf_export(v1=["train.shuffle_batch"])
1249@deprecation.deprecated(
1250    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1251    "`tf.data.Dataset.shuffle(min_after_dequeue).batch(batch_size)`.")
1252def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
1253                  num_threads=1, seed=None, enqueue_many=False, shapes=None,
1254                  allow_smaller_final_batch=False, shared_name=None, name=None):
1255  """Creates batches by randomly shuffling tensors.
1256
1257  This function adds the following to the current `Graph`:
1258
1259  * A shuffling queue into which tensors from `tensors` are enqueued.
1260  * A `dequeue_many` operation to create batches from the queue.
1261  * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors
1262    from `tensors`.
1263
1264  If `enqueue_many` is `False`, `tensors` is assumed to represent a
1265  single example.  An input tensor with shape `[x, y, z]` will be output
1266  as a tensor with shape `[batch_size, x, y, z]`.
1267
1268  If `enqueue_many` is `True`, `tensors` is assumed to represent a
1269  batch of examples, where the first dimension is indexed by example,
1270  and all members of `tensors` should have the same size in the
1271  first dimension.  If an input tensor has shape `[*, x, y, z]`, the
1272  output will have shape `[batch_size, x, y, z]`.
1273
1274  The `capacity` argument controls the how long the prefetching is allowed to
1275  grow the queues.
1276
1277  The returned operation is a dequeue operation and will throw
1278  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1279  operation is feeding another input queue, its queue runner will catch
1280  this exception, however, if this operation is used in your main thread
1281  you are responsible for catching this yourself.
1282
1283  For example:
1284
1285  ```python
1286  # Creates batches of 32 images and 32 labels.
1287  image_batch, label_batch = tf.train.shuffle_batch(
1288        [single_image, single_label],
1289        batch_size=32,
1290        num_threads=4,
1291        capacity=50000,
1292        min_after_dequeue=10000)
1293  ```
1294
1295  *N.B.:* You must ensure that either (i) the `shapes` argument is
1296  passed, or (ii) all of the tensors in `tensors` must have
1297  fully-defined shapes. `ValueError` will be raised if neither of
1298  these conditions holds.
1299
1300  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1301  `batch_size` is returned when the queue is closed and there are not enough
1302  elements to fill the batch, otherwise the pending elements are discarded.
1303  In addition, all output tensors' static shapes, as accessed via the
1304  `shape` property will have a first `Dimension` value of `None`, and
1305  operations that depend on fixed batch_size would fail.
1306
1307  Args:
1308    tensors: The list or dictionary of tensors to enqueue.
1309    batch_size: The new batch size pulled from the queue.
1310    capacity: An integer. The maximum number of elements in the queue.
1311    min_after_dequeue: Minimum number elements in the queue after a
1312      dequeue, used to ensure a level of mixing of elements.
1313    num_threads: The number of threads enqueuing `tensor_list`.
1314    seed: Seed for the random shuffling within the queue.
1315    enqueue_many: Whether each tensor in `tensor_list` is a single example.
1316    shapes: (Optional) The shapes for each example.  Defaults to the
1317      inferred shapes for `tensor_list`.
1318    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1319      batch to be smaller if there are insufficient items left in the queue.
1320    shared_name: (Optional) If set, this queue will be shared under the given
1321      name across multiple sessions.
1322    name: (Optional) A name for the operations.
1323
1324  Returns:
1325    A list or dictionary of tensors with the types as `tensors`.
1326
1327  Raises:
1328    ValueError: If the `shapes` are not specified, and cannot be
1329      inferred from the elements of `tensors`.
1330
1331  @compatibility(eager)
1332  Input pipelines based on Queues are not supported when eager execution is
1333  enabled. Please use the `tf.data` API to ingest data under eager execution.
1334  @end_compatibility
1335  """
1336  return _shuffle_batch(
1337      tensors,
1338      batch_size,
1339      capacity,
1340      min_after_dequeue,
1341      keep_input=True,
1342      num_threads=num_threads,
1343      seed=seed,
1344      enqueue_many=enqueue_many,
1345      shapes=shapes,
1346      allow_smaller_final_batch=allow_smaller_final_batch,
1347      shared_name=shared_name,
1348      name=name)
1349
1350
1351@tf_export(v1=["train.maybe_shuffle_batch"])
1352@deprecation.deprecated(
1353    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1354    "`tf.data.Dataset.filter(...).shuffle(min_after_dequeue).batch(batch_size)`"
1355    ".")
1356def maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
1357                        keep_input, num_threads=1, seed=None,
1358                        enqueue_many=False, shapes=None,
1359                        allow_smaller_final_batch=False, shared_name=None,
1360                        name=None):
1361  """Creates batches by randomly shuffling conditionally-enqueued tensors.
1362
1363  See docstring in `shuffle_batch` for more details.
1364
1365  Args:
1366    tensors: The list or dictionary of tensors to enqueue.
1367    batch_size: The new batch size pulled from the queue.
1368    capacity: An integer. The maximum number of elements in the queue.
1369    min_after_dequeue: Minimum number elements in the queue after a
1370      dequeue, used to ensure a level of mixing of elements.
1371    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1372      added to the queue or not.  If it is a scalar and evaluates `True`, then
1373      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1374      is `True`, then each example is added to the queue only if the
1375      corresponding value in `keep_input` is `True`. This tensor essentially
1376      acts as a filtering mechanism.
1377    num_threads: The number of threads enqueuing `tensor_list`.
1378    seed: Seed for the random shuffling within the queue.
1379    enqueue_many: Whether each tensor in `tensor_list` is a single example.
1380    shapes: (Optional) The shapes for each example.  Defaults to the
1381      inferred shapes for `tensor_list`.
1382    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1383      batch to be smaller if there are insufficient items left in the queue.
1384    shared_name: (Optional) If set, this queue will be shared under the given
1385      name across multiple sessions.
1386    name: (Optional) A name for the operations.
1387
1388  Returns:
1389    A list or dictionary of tensors with the types as `tensors`.
1390
1391  Raises:
1392    ValueError: If the `shapes` are not specified, and cannot be
1393      inferred from the elements of `tensors`.
1394
1395  @compatibility(eager)
1396  Input pipelines based on Queues are not supported when eager execution is
1397  enabled. Please use the `tf.data` API to ingest data under eager execution.
1398  @end_compatibility
1399  """
1400  return _shuffle_batch(
1401      tensors,
1402      batch_size,
1403      capacity,
1404      min_after_dequeue,
1405      keep_input,
1406      num_threads=num_threads,
1407      seed=seed,
1408      enqueue_many=enqueue_many,
1409      shapes=shapes,
1410      allow_smaller_final_batch=allow_smaller_final_batch,
1411      shared_name=shared_name,
1412      name=name)
1413
1414
1415@tf_export(v1=["train.shuffle_batch_join"])
1416@deprecation.deprecated(
1417    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1418    "`tf.data.Dataset.interleave(...).shuffle(min_after_dequeue).batch"
1419    "(batch_size)`.")
1420def shuffle_batch_join(tensors_list, batch_size, capacity,
1421                       min_after_dequeue, seed=None, enqueue_many=False,
1422                       shapes=None, allow_smaller_final_batch=False,
1423                       shared_name=None, name=None):
1424  """Create batches by randomly shuffling tensors.
1425
1426  The `tensors_list` argument is a list of tuples of tensors, or a list of
1427  dictionaries of tensors.  Each element in the list is treated similarly
1428  to the `tensors` argument of `tf.train.shuffle_batch()`.
1429
1430  This version enqueues a different list of tensors in different threads.
1431  It adds the following to the current `Graph`:
1432
1433  * A shuffling queue into which tensors from `tensors_list` are enqueued.
1434  * A `dequeue_many` operation to create batches from the queue.
1435  * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors
1436    from `tensors_list`.
1437
1438  `len(tensors_list)` threads will be started, with thread `i` enqueuing
1439  the tensors from `tensors_list[i]`. `tensors_list[i1][j]` must match
1440  `tensors_list[i2][j]` in type and shape, except in the first dimension if
1441  `enqueue_many` is true.
1442
1443  If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
1444  to represent a single example.  An input tensor with shape `[x, y, z]`
1445  will be output as a tensor with shape `[batch_size, x, y, z]`.
1446
1447  If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
1448  represent a batch of examples, where the first dimension is indexed
1449  by example, and all members of `tensors_list[i]` should have the
1450  same size in the first dimension.  If an input tensor has shape `[*, x,
1451  y, z]`, the output will have shape `[batch_size, x, y, z]`.
1452
1453  The `capacity` argument controls the how long the prefetching is allowed to
1454  grow the queues.
1455
1456  The returned operation is a dequeue operation and will throw
1457  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1458  operation is feeding another input queue, its queue runner will catch
1459  this exception, however, if this operation is used in your main thread
1460  you are responsible for catching this yourself.
1461
1462  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1463  `batch_size` is returned when the queue is closed and there are not enough
1464  elements to fill the batch, otherwise the pending elements are discarded.
1465  In addition, all output tensors' static shapes, as accessed via the
1466  `shape` property will have a first `Dimension` value of `None`, and
1467  operations that depend on fixed batch_size would fail.
1468
1469  Args:
1470    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1471    batch_size: An integer. The new batch size pulled from the queue.
1472    capacity: An integer. The maximum number of elements in the queue.
1473    min_after_dequeue: Minimum number elements in the queue after a
1474      dequeue, used to ensure a level of mixing of elements.
1475    seed: Seed for the random shuffling within the queue.
1476    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1477      example.
1478    shapes: (Optional) The shapes for each example.  Defaults to the
1479      inferred shapes for `tensors_list[i]`.
1480    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1481      batch to be smaller if there are insufficient items left in the queue.
1482    shared_name: (optional). If set, this queue will be shared under the given
1483      name across multiple sessions.
1484    name: (Optional) A name for the operations.
1485
1486  Returns:
1487    A list or dictionary of tensors with the same number and types as
1488    `tensors_list[i]`.
1489
1490  Raises:
1491    ValueError: If the `shapes` are not specified, and cannot be
1492      inferred from the elements of `tensors_list`.
1493
1494  @compatibility(eager)
1495  Input pipelines based on Queues are not supported when eager execution is
1496  enabled. Please use the `tf.data` API to ingest data under eager execution.
1497  @end_compatibility
1498  """
1499  return _shuffle_batch_join(
1500      tensors_list,
1501      batch_size,
1502      capacity,
1503      min_after_dequeue,
1504      keep_input=True,
1505      seed=seed,
1506      enqueue_many=enqueue_many,
1507      shapes=shapes,
1508      allow_smaller_final_batch=allow_smaller_final_batch,
1509      shared_name=shared_name,
1510      name=name)
1511
1512
1513@tf_export(v1=["train.maybe_shuffle_batch_join"])
1514@deprecation.deprecated(
1515    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1516    "`tf.data.Dataset.interleave(...).filter(...).shuffle(min_after_dequeue)"
1517    ".batch(batch_size)`.")
1518def maybe_shuffle_batch_join(tensors_list, batch_size, capacity,
1519                             min_after_dequeue, keep_input, seed=None,
1520                             enqueue_many=False, shapes=None,
1521                             allow_smaller_final_batch=False, shared_name=None,
1522                             name=None):
1523  """Create batches by randomly shuffling conditionally-enqueued tensors.
1524
1525  See docstring in `shuffle_batch_join` for more details.
1526
1527  Args:
1528    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1529    batch_size: An integer. The new batch size pulled from the queue.
1530    capacity: An integer. The maximum number of elements in the queue.
1531    min_after_dequeue: Minimum number elements in the queue after a
1532      dequeue, used to ensure a level of mixing of elements.
1533    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1534      added to the queue or not.  If it is a scalar and evaluates `True`, then
1535      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1536      is `True`, then each example is added to the queue only if the
1537      corresponding value in `keep_input` is `True`. This tensor essentially
1538      acts as a filtering mechanism.
1539    seed: Seed for the random shuffling within the queue.
1540    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1541      example.
1542    shapes: (Optional) The shapes for each example.  Defaults to the
1543      inferred shapes for `tensors_list[i]`.
1544    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1545      batch to be smaller if there are insufficient items left in the queue.
1546    shared_name: (optional). If set, this queue will be shared under the given
1547      name across multiple sessions.
1548    name: (Optional) A name for the operations.
1549
1550  Returns:
1551    A list or dictionary of tensors with the same number and types as
1552    `tensors_list[i]`.
1553
1554  Raises:
1555    ValueError: If the `shapes` are not specified, and cannot be
1556      inferred from the elements of `tensors_list`.
1557
1558  @compatibility(eager)
1559  Input pipelines based on Queues are not supported when eager execution is
1560  enabled. Please use the `tf.data` API to ingest data under eager execution.
1561  @end_compatibility
1562  """
1563  return _shuffle_batch_join(
1564      tensors_list,
1565      batch_size,
1566      capacity,
1567      min_after_dequeue,
1568      keep_input,
1569      seed=seed,
1570      enqueue_many=enqueue_many,
1571      shapes=shapes,
1572      allow_smaller_final_batch=allow_smaller_final_batch,
1573      shared_name=shared_name,
1574      name=name)
1575