1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15 16"""Input pipeline. 17 18Please see the [reading data 19how-to](https://tensorflow.org/api_guides/python/reading_data) 20for context. 21""" 22 23from __future__ import absolute_import 24from __future__ import division 25from __future__ import print_function 26 27import collections 28 29from six.moves import xrange # pylint: disable=redefined-builtin 30 31from tensorflow.python.eager import context 32from tensorflow.python.framework import constant_op 33from tensorflow.python.framework import dtypes 34from tensorflow.python.framework import ops 35from tensorflow.python.framework import sparse_tensor 36from tensorflow.python.framework import tensor_shape 37from tensorflow.python.layers import utils 38from tensorflow.python.ops import array_ops 39from tensorflow.python.ops import control_flow_ops 40from tensorflow.python.ops import data_flow_ops 41from tensorflow.python.ops import io_ops 42from tensorflow.python.ops import math_ops 43from tensorflow.python.ops import random_ops 44from tensorflow.python.ops import sparse_ops 45from tensorflow.python.ops import variable_scope as vs 46from tensorflow.python.summary import summary 47from tensorflow.python.training import queue_runner 48from tensorflow.python.util import deprecation 49from tensorflow.python.util.tf_export import tf_export 50 51 52# pylint: disable=protected-access 53_store_sparse = sparse_ops._add_sparse_to_tensors_map 54_store_many_sparse = sparse_ops._add_many_sparse_to_tensors_map 55_restore_sparse = sparse_ops._take_many_sparse_from_tensors_map 56# pylint: enable=protected-access 57 58 59@tf_export( 60 "io.match_filenames_once", 61 v1=["io.match_filenames_once", "train.match_filenames_once"]) 62@deprecation.deprecated_endpoints("train.match_filenames_once") 63def match_filenames_once(pattern, name=None): 64 """Save the list of files matching pattern, so it is only computed once. 65 66 NOTE: The order of the files returned can be non-deterministic. 67 68 Args: 69 pattern: A file pattern (glob), or 1D tensor of file patterns. 70 name: A name for the operations (optional). 71 72 Returns: 73 A variable that is initialized to the list of files matching the pattern(s). 74 """ 75 with ops.name_scope(name, "matching_filenames", [pattern]) as name: 76 return vs.variable( 77 name=name, initial_value=io_ops.matching_files(pattern), 78 trainable=False, validate_shape=False, 79 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 80 81 82@tf_export(v1=["train.limit_epochs"]) 83@deprecation.deprecated( 84 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 85 "`tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.") 86def limit_epochs(tensor, num_epochs=None, name=None): 87 """Returns tensor `num_epochs` times and then raises an `OutOfRange` error. 88 89 Note: creates local counter `epochs`. Use `local_variables_initializer()` to 90 initialize local variables. 91 92 Args: 93 tensor: Any `Tensor`. 94 num_epochs: A positive integer (optional). If specified, limits the number 95 of steps the output tensor may be evaluated. 96 name: A name for the operations (optional). 97 98 Returns: 99 tensor or `OutOfRange`. 100 101 Raises: 102 ValueError: if `num_epochs` is invalid. 103 """ 104 if num_epochs is None: 105 return tensor 106 if num_epochs <= 0: 107 raise ValueError("num_epochs must be > 0 not %d." % num_epochs) 108 with ops.name_scope(name, "limit_epochs", [tensor]) as name: 109 zero64 = constant_op.constant(0, dtype=dtypes.int64) 110 epochs = vs.variable( 111 zero64, name="epochs", trainable=False, 112 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 113 counter = epochs.count_up_to(num_epochs) 114 with ops.control_dependencies([counter]): 115 return array_ops.identity(tensor, name=name) 116 117 118@tf_export(v1=["train.input_producer"]) 119@deprecation.deprecated( 120 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 121 "`tf.data.Dataset.from_tensor_slices(input_tensor).shuffle" 122 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 123 "`shuffle=False`, omit the `.shuffle(...)`.") 124def input_producer(input_tensor, 125 element_shape=None, 126 num_epochs=None, 127 shuffle=True, 128 seed=None, 129 capacity=32, 130 shared_name=None, 131 summary_name=None, 132 name=None, 133 cancel_op=None): 134 """Output the rows of `input_tensor` to a queue for an input pipeline. 135 136 Note: if `num_epochs` is not `None`, this function creates local counter 137 `epochs`. Use `local_variables_initializer()` to initialize local variables. 138 139 Args: 140 input_tensor: A tensor with the rows to produce. Must be at least 141 one-dimensional. Must either have a fully-defined shape, or 142 `element_shape` must be defined. 143 element_shape: (Optional.) A `TensorShape` representing the shape of a 144 row of `input_tensor`, if it cannot be inferred. 145 num_epochs: (Optional.) An integer. If specified `input_producer` produces 146 each row of `input_tensor` `num_epochs` times before generating an 147 `OutOfRange` error. If not specified, `input_producer` can cycle through 148 the rows of `input_tensor` an unlimited number of times. 149 shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled 150 within each epoch. 151 seed: (Optional.) An integer. The seed to use if `shuffle` is true. 152 capacity: (Optional.) The capacity of the queue to be used for buffering 153 the input. 154 shared_name: (Optional.) If set, this queue will be shared under the given 155 name across multiple sessions. 156 summary_name: (Optional.) If set, a scalar summary for the current queue 157 size will be generated, using this name as part of the tag. 158 name: (Optional.) A name for queue. 159 cancel_op: (Optional.) Cancel op for the queue 160 161 Returns: 162 A queue with the output rows. A `QueueRunner` for the queue is 163 added to the current `QUEUE_RUNNER` collection of the current 164 graph. 165 166 Raises: 167 ValueError: If the shape of the input cannot be inferred from the arguments. 168 RuntimeError: If called with eager execution enabled. 169 170 @compatibility(eager) 171 Input pipelines based on Queues are not supported when eager execution is 172 enabled. Please use the `tf.data` API to ingest data under eager execution. 173 @end_compatibility 174 """ 175 if context.executing_eagerly(): 176 raise RuntimeError( 177 "Input pipelines based on Queues are not supported when eager execution" 178 " is enabled. Please use tf.data to ingest data into your model" 179 " instead.") 180 with ops.name_scope(name, "input_producer", [input_tensor]): 181 input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor") 182 element_shape = input_tensor.shape[1:].merge_with(element_shape) 183 if not element_shape.is_fully_defined(): 184 raise ValueError("Either `input_tensor` must have a fully defined shape " 185 "or `element_shape` must be specified") 186 187 if shuffle: 188 input_tensor = random_ops.random_shuffle(input_tensor, seed=seed) 189 190 input_tensor = limit_epochs(input_tensor, num_epochs) 191 192 q = data_flow_ops.FIFOQueue(capacity=capacity, 193 dtypes=[input_tensor.dtype.base_dtype], 194 shapes=[element_shape], 195 shared_name=shared_name, name=name) 196 enq = q.enqueue_many([input_tensor]) 197 queue_runner.add_queue_runner( 198 queue_runner.QueueRunner( 199 q, [enq], cancel_op=cancel_op)) 200 if summary_name is not None: 201 summary.scalar(summary_name, 202 math_ops.cast(q.size(), dtypes.float32) * (1. / capacity)) 203 return q 204 205 206@tf_export(v1=["train.string_input_producer"]) 207@deprecation.deprecated( 208 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 209 "`tf.data.Dataset.from_tensor_slices(string_tensor).shuffle" 210 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 211 "`shuffle=False`, omit the `.shuffle(...)`.") 212def string_input_producer(string_tensor, 213 num_epochs=None, 214 shuffle=True, 215 seed=None, 216 capacity=32, 217 shared_name=None, 218 name=None, 219 cancel_op=None): 220 """Output strings (e.g. filenames) to a queue for an input pipeline. 221 222 Note: if `num_epochs` is not `None`, this function creates local counter 223 `epochs`. Use `local_variables_initializer()` to initialize local variables. 224 225 Args: 226 string_tensor: A 1-D string tensor with the strings to produce. 227 num_epochs: An integer (optional). If specified, `string_input_producer` 228 produces each string from `string_tensor` `num_epochs` times before 229 generating an `OutOfRange` error. If not specified, 230 `string_input_producer` can cycle through the strings in `string_tensor` 231 an unlimited number of times. 232 shuffle: Boolean. If true, the strings are randomly shuffled within each 233 epoch. 234 seed: An integer (optional). Seed used if shuffle == True. 235 capacity: An integer. Sets the queue capacity. 236 shared_name: (optional). If set, this queue will be shared under the given 237 name across multiple sessions. All sessions open to the device which has 238 this queue will be able to access it via the shared_name. Using this in 239 a distributed setting means each name will only be seen by one of the 240 sessions which has access to this operation. 241 name: A name for the operations (optional). 242 cancel_op: Cancel op for the queue (optional). 243 244 Returns: 245 A queue with the output strings. A `QueueRunner` for the Queue 246 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 247 248 Raises: 249 ValueError: If the string_tensor is a null Python list. At runtime, 250 will fail with an assertion if string_tensor becomes a null tensor. 251 252 @compatibility(eager) 253 Input pipelines based on Queues are not supported when eager execution is 254 enabled. Please use the `tf.data` API to ingest data under eager execution. 255 @end_compatibility 256 """ 257 not_null_err = "string_input_producer requires a non-null input tensor" 258 if not isinstance(string_tensor, ops.Tensor) and not string_tensor: 259 raise ValueError(not_null_err) 260 261 with ops.name_scope(name, "input_producer", [string_tensor]) as name: 262 string_tensor = ops.convert_to_tensor(string_tensor, dtype=dtypes.string) 263 with ops.control_dependencies([ 264 control_flow_ops.Assert( 265 math_ops.greater(array_ops.size(string_tensor), 0), 266 [not_null_err])]): 267 string_tensor = array_ops.identity(string_tensor) 268 return input_producer( 269 input_tensor=string_tensor, 270 element_shape=[], 271 num_epochs=num_epochs, 272 shuffle=shuffle, 273 seed=seed, 274 capacity=capacity, 275 shared_name=shared_name, 276 name=name, 277 summary_name="fraction_of_%d_full" % capacity, 278 cancel_op=cancel_op) 279 280 281@tf_export(v1=["train.range_input_producer"]) 282@deprecation.deprecated( 283 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 284 "`tf.data.Dataset.range(limit).shuffle(limit).repeat(num_epochs)`. If " 285 "`shuffle=False`, omit the `.shuffle(...)`.") 286def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None, 287 capacity=32, shared_name=None, name=None): 288 """Produces the integers from 0 to limit-1 in a queue. 289 290 Note: if `num_epochs` is not `None`, this function creates local counter 291 `epochs`. Use `local_variables_initializer()` to initialize local variables. 292 293 Args: 294 limit: An int32 scalar tensor. 295 num_epochs: An integer (optional). If specified, `range_input_producer` 296 produces each integer `num_epochs` times before generating an 297 OutOfRange error. If not specified, `range_input_producer` can cycle 298 through the integers an unlimited number of times. 299 shuffle: Boolean. If true, the integers are randomly shuffled within each 300 epoch. 301 seed: An integer (optional). Seed used if shuffle == True. 302 capacity: An integer. Sets the queue capacity. 303 shared_name: (optional). If set, this queue will be shared under the given 304 name across multiple sessions. 305 name: A name for the operations (optional). 306 307 Returns: 308 A Queue with the output integers. A `QueueRunner` for the Queue 309 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 310 311 @compatibility(eager) 312 Input pipelines based on Queues are not supported when eager execution is 313 enabled. Please use the `tf.data` API to ingest data under eager execution. 314 @end_compatibility 315 """ 316 with ops.name_scope(name, "input_producer", [limit]) as name: 317 range_tensor = math_ops.range(limit) 318 return input_producer( 319 range_tensor, [], num_epochs, shuffle, seed, capacity, 320 shared_name, "fraction_of_%d_full" % capacity, name) 321 322 323@tf_export(v1=["train.slice_input_producer"]) 324@deprecation.deprecated( 325 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 326 "`tf.data.Dataset.from_tensor_slices(tuple(tensor_list)).shuffle" 327 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 328 "`shuffle=False`, omit the `.shuffle(...)`.") 329def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None, 330 capacity=32, shared_name=None, name=None): 331 """Produces a slice of each `Tensor` in `tensor_list`. 332 333 Implemented using a Queue -- a `QueueRunner` for the Queue 334 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 335 336 Args: 337 tensor_list: A list of `Tensor` objects. Every `Tensor` in 338 `tensor_list` must have the same size in the first dimension. 339 num_epochs: An integer (optional). If specified, `slice_input_producer` 340 produces each slice `num_epochs` times before generating 341 an `OutOfRange` error. If not specified, `slice_input_producer` can cycle 342 through the slices an unlimited number of times. 343 shuffle: Boolean. If true, the integers are randomly shuffled within each 344 epoch. 345 seed: An integer (optional). Seed used if shuffle == True. 346 capacity: An integer. Sets the queue capacity. 347 shared_name: (optional). If set, this queue will be shared under the given 348 name across multiple sessions. 349 name: A name for the operations (optional). 350 351 Returns: 352 A list of tensors, one for each element of `tensor_list`. If the tensor 353 in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output 354 tensor will have shape `[a, b, ..., z]`. 355 356 Raises: 357 ValueError: if `slice_input_producer` produces nothing from `tensor_list`. 358 359 @compatibility(eager) 360 Input pipelines based on Queues are not supported when eager execution is 361 enabled. Please use the `tf.data` API to ingest data under eager execution. 362 @end_compatibility 363 """ 364 with ops.name_scope(name, "input_producer", tensor_list): 365 tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) 366 if not tensor_list: 367 raise ValueError( 368 "Expected at least one tensor in slice_input_producer().") 369 range_size = array_ops.shape(tensor_list[0])[0] 370 # TODO(josh11b): Add an assertion that the first dimension of 371 # everything in TensorList matches. Maybe just check the inferred shapes? 372 queue = range_input_producer(range_size, num_epochs=num_epochs, 373 shuffle=shuffle, seed=seed, capacity=capacity, 374 shared_name=shared_name) 375 index = queue.dequeue() 376 output = [array_ops.gather(t, index) for t in tensor_list] 377 return output 378 379 380# Helpers for the batching functions ------------------------------------------ 381 382 383def _flatten(tensor_list_list): 384 return [tensor for tensor_list in tensor_list_list for tensor in tensor_list] 385 386 387class _SparseMetaData(object): 388 """Store information about the Tensor: Is it sparse?, map_op, and rank.""" 389 390 def __init__(self, sparse, map_op, rank): 391 """Create the metadata. 392 393 Args: 394 sparse: Python boolean. 395 map_op: The `Operation` that created the `SparseTensorsMap` in question. 396 This Op contains information about the underlying Map object and the 397 dtype of the original data. 398 rank: The statically known rank of the `SparseTensor`. 399 """ 400 self._sparse = sparse 401 self._map_op = map_op 402 self._rank = tensor_shape.Dimension(rank) 403 404 def __eq__(self, other): 405 if self.sparse != other.sparse: 406 return False 407 if not self.sparse: 408 return True 409 # If map_ops are not the same, the data source is not the same. 410 if (self.map_op is not None) != (other.map_op is not None): 411 return False 412 if self.map_op != other.map_op: 413 return False 414 if not self.rank.is_compatible_with(other.rank): 415 return False 416 return True 417 418 def __ne__(self, other): 419 return not self.__eq__(other) 420 421 def __str__(self): 422 return "[SparseMetaData(%s, %s, %s)]" % (self.sparse, self.map_op.name, 423 self.rank) 424 425 def merge_with(self, other): 426 if self != other: 427 raise ValueError("SparseMetaData objects are incompatible: %s vs. %s" 428 % (self, other)) 429 if self.sparse: 430 self.rank.merge_with(other.rank) 431 return self 432 433 @property 434 def map_op(self): 435 return self._map_op 436 437 @property 438 def sparse(self): 439 return self._sparse 440 441 @property 442 def rank(self): 443 return self._rank 444 445 446def _as_tensor_list(tensors): 447 if isinstance(tensors, dict): 448 return [tensors[k] for k in sorted(tensors, key=str)] 449 else: 450 return tensors 451 452 453def _as_tensor_list_list(tensors_list): 454 if not tensors_list: 455 raise ValueError("Expected at least one set of tensors") 456 if isinstance(tensors_list[0], dict): 457 expected_keys = set(tensors_list[0].keys()) 458 for tensors in tensors_list[1:]: 459 if set(tensors.keys()) != expected_keys: 460 raise ValueError("All dictionaries in tensors_list must have " 461 "the same keys") 462 return [_as_tensor_list(tensors) for tensors in tensors_list] 463 else: 464 return tensors_list 465 466 467def _as_original_type(original_tensors, tensor_list): 468 if isinstance(original_tensors, dict): 469 if len(original_tensors) == 1: 470 # tensor_list is bogusly returned as a single tensor if only one tensor 471 # was enqueued. Make it a list again. See b/28117485. 472 tensor_list = [tensor_list] 473 return {k: tensor_list[i] 474 for i, k in enumerate(sorted(original_tensors, key=str))} 475 else: 476 return tensor_list 477 478 479def _store_sparse_tensors(tensor_list, enqueue_many, keep_input, 480 shared_map_ops=None): 481 """Store SparseTensors for feeding into batch, etc. 482 483 If `shared_map_ops` is provided, the underlying `SparseTensorsMap` objects 484 are reused (shared). This argument is useful for, e.g., `batch_join` 485 where multiple enqueue operations write to the same Queue component, 486 and another (dequeue) thread reads from that same location and must then 487 restore the associated `SparseTensor` objects. In this case, the sparse 488 restore must have a single `SparseTensorMap` from which to read out the 489 handles; so a single `SparseTensorMap` must be shared for storing 490 across the multiple enqueue operations. This sharing is performed by 491 calling `_store_sparse_tensors` the first time with `shared_map_ops=None`, 492 and then in subsequent times with this value set to the list of `Operation` 493 objects created in the first call. 494 495 Args: 496 tensor_list: List of `Tensor` and `SparseTensor` objects. 497 enqueue_many: Python `Boolean`. 498 keep_input: Must be a scalar bool Tensor (not a Python bool). If False, 499 don't store. 500 shared_map_ops: (optional) List of `Operation` objects from a previous 501 call to `_store_sparse_tensors`. If not `None`, the op types should be 502 one of `AddSparseToTensorsMap` or `AddManySparseToTensorsMap` in the 503 locations corresponding to `SparseTensors` in `tensor_list`. 504 505 Returns: 506 A tuple `(stored_list, sparse_info_list)` where `stored_list` is a list 507 of `Tensor` objects (same length as `tensor_list`) and `sparse_info_list` 508 is a list of the same length of `_SparseMetaData` objects. 509 """ 510 maybe_shared_map_ops = shared_map_ops or [None] * len(tensor_list) 511 512 def _sparse_meta_data(t, storing_op, map_op): 513 if not isinstance(t, sparse_tensor.SparseTensor): 514 return _SparseMetaData(False, None, None) 515 rank = t.dense_shape.shape.with_rank(1).dims[0] 516 if enqueue_many: 517 rank -= 1 518 # If a shared map_op was provided, use that. Otherwise use the name of 519 # the operation used to store the SparseTensor. 520 return _SparseMetaData( 521 sparse=True, map_op=map_op or storing_op, rank=rank) 522 523 def _maybe_store(t, shared_map_op): 524 """Store Sparse tensor, if necessary.""" 525 if not isinstance(t, sparse_tensor.SparseTensor): 526 return t 527 map_op_name = shared_map_op.name if shared_map_op else None 528 def _maybe_store_sparse(t, map_op_name, keep_input): 529 """Conditionally store a single sparse Tensor.""" 530 return utils.smart_cond( 531 keep_input, 532 lambda: _store_sparse(t, shared_name=map_op_name), 533 lambda: constant_op.constant(-1, dtypes.int64)) 534 def _maybe_store_many_sparse(t, map_op_name, keep_input): 535 """Conditionally store multiple sparse Tensors.""" 536 out_tensor = utils.smart_cond( 537 keep_input, 538 lambda: _store_many_sparse(t, shared_name=map_op_name), 539 lambda: -1 * array_ops.ones(array_ops.shape(t)[0:1], dtypes.int64)) 540 out_tensor.set_shape([None]) # necessary when t.ndims is unknown 541 return out_tensor 542 def _sparse_values_to_keep(t, keep_input): 543 """Convert a per-row `keep_input` vector to a per-value one.""" 544 # Get the rows of every value in the sparse Tensor. 545 row_values = t.indices[:, 0] 546 # The value should be kept iff the row should be kept. 547 return array_ops.gather(keep_input, row_values) 548 if keep_input.shape.ndims == 1: 549 t = sparse_ops.sparse_retain(t, _sparse_values_to_keep(t, keep_input)) 550 store_f = lambda t, name, _: _store_many_sparse(t, shared_name=name) 551 elif enqueue_many: 552 store_f = _maybe_store_many_sparse 553 else: 554 store_f = _maybe_store_sparse 555 return store_f(t, map_op_name, keep_input) 556 557 stored_list = [ 558 _maybe_store(t, shared_map_op) for t, shared_map_op 559 in zip(tensor_list, maybe_shared_map_ops)] 560 # Since the output of `_store{_many}_sparse is wrapped in a tf.cond `Merge`, 561 # we can't just get the Op of the resulting tensor. 562 def _sparse_op(stored): 563 for input_tensor in stored.op.inputs: 564 if input_tensor.op.type in ("AddSparseToTensorsMap", 565 "AddManySparseToTensorsMap"): 566 return input_tensor.op 567 # If there was no sparse input, then the original stored Tensor wasn't 568 # sparse and we can just return the original Tensor's Op. 569 return stored.op 570 sparse_info_list = [ 571 _sparse_meta_data(t, _sparse_op(stored), shared_map_op) 572 for t, stored, shared_map_op 573 in zip(tensor_list, stored_list, maybe_shared_map_ops)] 574 # Expand dims of stored tensors by 1 for proper enqueue shape 575 stored_list = [ 576 array_ops.expand_dims(s, [-1]) if s_info.sparse else s 577 for s, s_info in zip(stored_list, sparse_info_list)] 578 return stored_list, sparse_info_list 579 580 581def _store_sparse_tensors_join(tensor_list_list, enqueue_many, keep_input): 582 """Store SparseTensors for feeding into batch_join, etc.""" 583 (s0, sparse_info_list) = _store_sparse_tensors( 584 tensor_list_list[0], enqueue_many, keep_input) 585 stored_list_list = [s0] 586 for tensor_list in tensor_list_list[1:]: 587 s, sparse_info_candidate = _store_sparse_tensors( 588 tensor_list, enqueue_many, keep_input, 589 [st.map_op for st in sparse_info_list]) 590 if sparse_info_list != sparse_info_candidate: 591 raise ValueError("Inconsistent SparseTensors list: %s vs. %s" 592 % (tensor_list_list[0], tensor_list)) 593 sparse_info_list = [ 594 info.merge_with(candidate) 595 for (info, candidate) in zip(sparse_info_list, sparse_info_candidate)] 596 stored_list_list.append(s) 597 598 return (stored_list_list, sparse_info_list) 599 600 601def _restore_sparse_tensors(stored_list, sparse_info_list): 602 """Restore SparseTensors after dequeue in batch, batch_join, etc.""" 603 received_sequence = isinstance(stored_list, collections.Sequence) 604 if not received_sequence: 605 stored_list = (stored_list,) 606 tensors = [ 607 _restore_sparse(sparse_map_op=info.map_op, 608 sparse_handles=array_ops.squeeze(s, [1]), 609 rank=tensor_shape.dimension_value(info.rank + 1)) 610 if info.sparse else s 611 for (s, info) in zip(stored_list, sparse_info_list)] 612 has_st = any(isinstance(x, sparse_tensor.SparseTensor) for x in tensors) 613 if has_st: 614 t_values = [ 615 x.values if isinstance(x, sparse_tensor.SparseTensor) 616 else x 617 for x in tensors] 618 with_deps = lambda x: control_flow_ops.with_dependencies(t_values, x) 619 ensure_restore_tensors = [ 620 sparse_tensor.SparseTensor(indices=with_deps(x.indices), 621 values=with_deps(x.values), 622 dense_shape=with_deps(x.dense_shape)) 623 if isinstance(x, sparse_tensor.SparseTensor) 624 else with_deps(x) 625 for x in tensors] 626 else: 627 ensure_restore_tensors = tensors 628 return ensure_restore_tensors if received_sequence else tensors[0] 629 630 631def _validate(tensor_list): 632 tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) 633 if not tensor_list: 634 raise ValueError("Expected at least one tensor in batch().") 635 return tensor_list 636 637 638def _validate_join(tensor_list_list): 639 tensor_list_list = [ops.convert_n_to_tensor_or_indexed_slices(tl) 640 for tl in tensor_list_list] 641 if not tensor_list_list: 642 raise ValueError("Expected at least one input in batch_join().") 643 return tensor_list_list 644 645 646def _validate_keep_input(keep_input, enqueue_many): 647 """Validate `keep_input` argument to conditional batching functions.""" 648 keep_input = ops.convert_to_tensor(keep_input) 649 if keep_input.shape.ndims is None: 650 raise ValueError( 651 "`keep_input` dimensions must be known at graph construction.") 652 if not enqueue_many and keep_input.shape.ndims == 1: 653 raise ValueError( 654 "`keep_input` cannot be a vector when `enqueue_many=False`.") 655 if keep_input.shape.ndims > 1: 656 raise ValueError("`keep_input` must be 0 or 1 dimensions.") 657 return keep_input 658 659 660def _dtypes(tensor_list_list): 661 all_types = [[t.dtype for t in tl] for tl in tensor_list_list] 662 types = all_types[0] 663 for other_types in all_types[1:]: 664 if other_types != types: 665 raise TypeError("Expected types to be consistent: %s vs. %s." % 666 (", ".join(x.name for x in types), 667 ", ".join(x.name for x in other_types))) 668 return types 669 670 671def _merge_shapes(shape_list, enqueue_many): 672 shape_list = [tensor_shape.as_shape(s) for s in shape_list] 673 if enqueue_many: 674 # We want the shapes without the leading batch dimension. 675 shape_list = [s.with_rank_at_least(1)[1:] for s in shape_list] 676 merged_shape = shape_list[0] 677 for s in shape_list[1:]: 678 merged_shape.merge_with(s) 679 return merged_shape.as_list() 680 681 682def _shapes(tensor_list_list, shapes, enqueue_many): 683 """Calculate and merge the shapes of incoming tensors. 684 685 Args: 686 tensor_list_list: List of tensor lists. 687 shapes: List of shape tuples corresponding to tensors within the lists. 688 enqueue_many: Boolean describing whether shapes will be enqueued as 689 batches or individual entries. 690 691 Returns: 692 A list of shapes aggregating shape inference info from `tensor_list_list`, 693 or returning `shapes` if it is not `None`. 694 695 Raises: 696 ValueError: If any of the inferred shapes in `tensor_list_list` lack a 697 well defined rank. 698 """ 699 if shapes is None: 700 len0 = len(tensor_list_list[0]) 701 702 for tl in tensor_list_list: 703 for i in xrange(len0): 704 if tl[i].shape.ndims is None: 705 raise ValueError("Cannot infer Tensor's rank: %s" % tl[i]) 706 707 shapes = [_merge_shapes( 708 [tl[i].shape.as_list() for tl in tensor_list_list], enqueue_many) 709 for i in xrange(len0)] 710 return shapes 711 712 713def _select_which_to_enqueue(tensor_list, keep_input): 714 """Select which examples to enqueue based on vector `keep_input`.""" 715 select_i = math_ops.cast(keep_input, dtypes.int32) 716 tensor_list = [ 717 data_flow_ops.dynamic_partition(x, select_i, num_partitions=2)[1] 718 for x in tensor_list] 719 return tensor_list 720 721 722def _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input): 723 """Enqueue `tensor_list_list` in `queue`.""" 724 if enqueue_many: 725 enqueue_fn = queue.enqueue_many 726 else: 727 enqueue_fn = queue.enqueue 728 if keep_input.shape.ndims == 1: 729 enqueue_ops = [enqueue_fn(_select_which_to_enqueue(x, keep_input)) 730 for x in tensor_list_list] 731 else: 732 enqueue_ops = [utils.smart_cond( 733 keep_input, 734 lambda: enqueue_fn(tl), # pylint:disable=cell-var-from-loop 735 control_flow_ops.no_op) for tl in tensor_list_list] 736 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) 737 738 739def _enqueue(queue, tensor_list, threads, enqueue_many, keep_input): 740 """Enqueue `tensor_list` in `queue`.""" 741 if enqueue_many: 742 enqueue_fn = queue.enqueue_many 743 else: 744 enqueue_fn = queue.enqueue 745 if keep_input.shape.ndims == 1: 746 enqueue_ops = [ 747 enqueue_fn(_select_which_to_enqueue(tensor_list, keep_input))] * threads 748 else: 749 enqueue_ops = [utils.smart_cond( 750 keep_input, 751 lambda: enqueue_fn(tensor_list), 752 control_flow_ops.no_op)] * threads 753 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) 754 755 756def _which_queue(dynamic_pad): 757 return (data_flow_ops.PaddingFIFOQueue if dynamic_pad 758 else data_flow_ops.FIFOQueue) 759 760 761def _batch(tensors, batch_size, keep_input, num_threads=1, capacity=32, 762 enqueue_many=False, shapes=None, dynamic_pad=False, 763 allow_smaller_final_batch=False, shared_name=None, 764 name=None): 765 """Helper function for `batch` and `maybe_batch`.""" 766 if context.executing_eagerly(): 767 raise ValueError( 768 "Input pipelines based on Queues are not supported when eager execution" 769 " is enabled. Please use tf.data to ingest data into your model" 770 " instead.") 771 tensor_list = _as_tensor_list(tensors) 772 with ops.name_scope(name, "batch", list(tensor_list) + [keep_input]) as name: 773 tensor_list = _validate(tensor_list) 774 keep_input = _validate_keep_input(keep_input, enqueue_many) 775 (tensor_list, sparse_info) = _store_sparse_tensors( 776 tensor_list, enqueue_many, keep_input) 777 types = _dtypes([tensor_list]) 778 shapes = _shapes([tensor_list], shapes, enqueue_many) 779 # TODO(josh11b,mrry): Switch to BatchQueue once it is written. 780 queue = _which_queue(dynamic_pad)( 781 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name) 782 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input) 783 summary.scalar( 784 "fraction_of_%d_full" % capacity, 785 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity)) 786 787 if allow_smaller_final_batch: 788 dequeued = queue.dequeue_up_to(batch_size, name=name) 789 else: 790 dequeued = queue.dequeue_many(batch_size, name=name) 791 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 792 return _as_original_type(tensors, dequeued) 793 794 795# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be 796# a multiple of len(tensor_list_list)?) parameter, to address the use 797# case where you want more parallelism than you can support different 798# readers (either because you don't have that many files or can't 799# read that many files in parallel due to the number of seeks required). 800# Once this is done, batch() can be written as a call to batch_join(). 801def _batch_join(tensors_list, batch_size, keep_input, capacity=32, 802 enqueue_many=False, shapes=None, dynamic_pad=False, 803 allow_smaller_final_batch=False, shared_name=None, name=None): 804 """Helper function for `batch_join` and `maybe_batch_join`.""" 805 if context.executing_eagerly(): 806 raise ValueError( 807 "Input pipelines based on Queues are not supported when eager execution" 808 " is enabled. Please use tf.data to ingest data into your model" 809 " instead.") 810 tensor_list_list = _as_tensor_list_list(tensors_list) 811 with ops.name_scope(name, "batch_join", 812 _flatten(tensor_list_list) + [keep_input]) as name: 813 tensor_list_list = _validate_join(tensor_list_list) 814 keep_input = _validate_keep_input(keep_input, enqueue_many) 815 tensor_list_list, sparse_info = _store_sparse_tensors_join( 816 tensor_list_list, enqueue_many, keep_input) 817 types = _dtypes(tensor_list_list) 818 shapes = _shapes(tensor_list_list, shapes, enqueue_many) 819 # TODO(josh11b,mrry): Switch to BatchQueue once it is written. 820 queue = _which_queue(dynamic_pad)( 821 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name) 822 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input) 823 summary.scalar( 824 "fraction_of_%d_full" % capacity, 825 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity)) 826 827 if allow_smaller_final_batch: 828 dequeued = queue.dequeue_up_to(batch_size, name=name) 829 else: 830 dequeued = queue.dequeue_many(batch_size, name=name) 831 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 832 # tensors_list was validated to not be empty. 833 return _as_original_type(tensors_list[0], dequeued) 834 835 836def _shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 837 keep_input, num_threads=1, seed=None, enqueue_many=False, 838 shapes=None, allow_smaller_final_batch=False, 839 shared_name=None, name=None): 840 """Helper function for `shuffle_batch` and `maybe_shuffle_batch`.""" 841 if context.executing_eagerly(): 842 raise ValueError( 843 "Input pipelines based on Queues are not supported when eager execution" 844 " is enabled. Please use tf.data to ingest data into your model" 845 " instead.") 846 tensor_list = _as_tensor_list(tensors) 847 with ops.name_scope(name, "shuffle_batch", 848 list(tensor_list) + [keep_input]) as name: 849 if capacity <= min_after_dequeue: 850 raise ValueError("capacity %d must be bigger than min_after_dequeue %d." 851 % (capacity, min_after_dequeue)) 852 tensor_list = _validate(tensor_list) 853 keep_input = _validate_keep_input(keep_input, enqueue_many) 854 tensor_list, sparse_info = _store_sparse_tensors( 855 tensor_list, enqueue_many, keep_input) 856 types = _dtypes([tensor_list]) 857 shapes = _shapes([tensor_list], shapes, enqueue_many) 858 queue = data_flow_ops.RandomShuffleQueue( 859 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, 860 dtypes=types, shapes=shapes, shared_name=shared_name) 861 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input) 862 full = (math_ops.cast( 863 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) * 864 (1. / (capacity - min_after_dequeue))) 865 # Note that name contains a '/' at the end so we intentionally do not place 866 # a '/' after %s below. 867 summary_name = ( 868 "fraction_over_%d_of_%d_full" % 869 (min_after_dequeue, capacity - min_after_dequeue)) 870 summary.scalar(summary_name, full) 871 872 if allow_smaller_final_batch: 873 dequeued = queue.dequeue_up_to(batch_size, name=name) 874 else: 875 dequeued = queue.dequeue_many(batch_size, name=name) 876 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 877 return _as_original_type(tensors, dequeued) 878 879 880def _shuffle_batch_join(tensors_list, batch_size, capacity, 881 min_after_dequeue, keep_input, seed=None, 882 enqueue_many=False, shapes=None, 883 allow_smaller_final_batch=False, shared_name=None, 884 name=None): 885 """Helper function for `shuffle_batch_join` and `maybe_shuffle_batch_join`.""" 886 if context.executing_eagerly(): 887 raise ValueError( 888 "Input pipelines based on Queues are not supported when eager execution" 889 " is enabled. Please use tf.data to ingest data into your model" 890 " instead.") 891 tensor_list_list = _as_tensor_list_list(tensors_list) 892 with ops.name_scope(name, "shuffle_batch_join", 893 _flatten(tensor_list_list) + [keep_input]) as name: 894 tensor_list_list = _validate_join(tensor_list_list) 895 keep_input = _validate_keep_input(keep_input, enqueue_many) 896 tensor_list_list, sparse_info = _store_sparse_tensors_join( 897 tensor_list_list, enqueue_many, keep_input) 898 types = _dtypes(tensor_list_list) 899 shapes = _shapes(tensor_list_list, shapes, enqueue_many) 900 queue = data_flow_ops.RandomShuffleQueue( 901 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, 902 dtypes=types, shapes=shapes, shared_name=shared_name) 903 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input) 904 full = (math_ops.cast( 905 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) * 906 (1. / (capacity - min_after_dequeue))) 907 # Note that name contains a '/' at the end so we intentionally do not place 908 # a '/' after %s below. 909 summary_name = ( 910 "fraction_over_%d_of_%d_full" % 911 (min_after_dequeue, capacity - min_after_dequeue)) 912 summary.scalar(summary_name, full) 913 914 if allow_smaller_final_batch: 915 dequeued = queue.dequeue_up_to(batch_size, name=name) 916 else: 917 dequeued = queue.dequeue_many(batch_size, name=name) 918 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 919 # tensors_list was validated to not be empty. 920 return _as_original_type(tensors_list[0], dequeued) 921 922# Batching functions ---------------------------------------------------------- 923 924 925@tf_export(v1=["train.batch"]) 926@deprecation.deprecated( 927 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 928 "`tf.data.Dataset.batch(batch_size)` (or `padded_batch(...)` if " 929 "`dynamic_pad=True`).") 930def batch(tensors, batch_size, num_threads=1, capacity=32, 931 enqueue_many=False, shapes=None, dynamic_pad=False, 932 allow_smaller_final_batch=False, shared_name=None, name=None): 933 """Creates batches of tensors in `tensors`. 934 935 The argument `tensors` can be a list or a dictionary of tensors. 936 The value returned by the function will be of the same type 937 as `tensors`. 938 939 This function is implemented using a queue. A `QueueRunner` for the 940 queue is added to the current `Graph`'s `QUEUE_RUNNER` collection. 941 942 If `enqueue_many` is `False`, `tensors` is assumed to represent a single 943 example. An input tensor with shape `[x, y, z]` will be output as a tensor 944 with shape `[batch_size, x, y, z]`. 945 946 If `enqueue_many` is `True`, `tensors` is assumed to represent a batch of 947 examples, where the first dimension is indexed by example, and all members of 948 `tensors` should have the same size in the first dimension. If an input 949 tensor has shape `[*, x, y, z]`, the output will have shape `[batch_size, x, 950 y, z]`. The `capacity` argument controls the how long the prefetching is 951 allowed to grow the queues. 952 953 The returned operation is a dequeue operation and will throw 954 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 955 operation is feeding another input queue, its queue runner will catch 956 this exception, however, if this operation is used in your main thread 957 you are responsible for catching this yourself. 958 959 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either 960 (i) the `shapes` argument is passed, or (ii) all of the tensors in 961 `tensors` must have fully-defined shapes. `ValueError` will be 962 raised if neither of these conditions holds. 963 964 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the 965 tensors is known, but individual dimensions may have shape `None`. 966 In this case, for each enqueue the dimensions with value `None` 967 may have a variable length; upon dequeue, the output tensors will be padded 968 on the right to the maximum shape of the tensors in the current minibatch. 969 For numbers, this padding takes value 0. For strings, this padding is 970 the empty string. See `PaddingFIFOQueue` for more info. 971 972 If `allow_smaller_final_batch` is `True`, a smaller batch value than 973 `batch_size` is returned when the queue is closed and there are not enough 974 elements to fill the batch, otherwise the pending elements are discarded. 975 In addition, all output tensors' static shapes, as accessed via the 976 `shape` property will have a first `Dimension` value of `None`, and 977 operations that depend on fixed batch_size would fail. 978 979 Args: 980 tensors: The list or dictionary of tensors to enqueue. 981 batch_size: The new batch size pulled from the queue. 982 num_threads: The number of threads enqueuing `tensors`. The batching will 983 be nondeterministic if `num_threads > 1`. 984 capacity: An integer. The maximum number of elements in the queue. 985 enqueue_many: Whether each tensor in `tensors` is a single example. 986 shapes: (Optional) The shapes for each example. Defaults to the 987 inferred shapes for `tensors`. 988 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 989 The given dimensions are padded upon dequeue so that tensors within a 990 batch have the same shapes. 991 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 992 batch to be smaller if there are insufficient items left in the queue. 993 shared_name: (Optional). If set, this queue will be shared under the given 994 name across multiple sessions. 995 name: (Optional) A name for the operations. 996 997 Returns: 998 A list or dictionary of tensors with the same types as `tensors` (except if 999 the input is a list of one element, then it returns a tensor, not a list). 1000 1001 Raises: 1002 ValueError: If the `shapes` are not specified, and cannot be 1003 inferred from the elements of `tensors`. 1004 1005 @compatibility(eager) 1006 Input pipelines based on Queues are not supported when eager execution is 1007 enabled. Please use the `tf.data` API to ingest data under eager execution. 1008 @end_compatibility 1009 """ 1010 return _batch( 1011 tensors, 1012 batch_size, 1013 keep_input=True, 1014 num_threads=num_threads, 1015 capacity=capacity, 1016 enqueue_many=enqueue_many, 1017 shapes=shapes, 1018 dynamic_pad=dynamic_pad, 1019 allow_smaller_final_batch=allow_smaller_final_batch, 1020 shared_name=shared_name, 1021 name=name) 1022 1023 1024@tf_export(v1=["train.maybe_batch"]) 1025@deprecation.deprecated( 1026 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1027 "`tf.data.Dataset.filter(...).batch(batch_size)` (or `padded_batch(...)`" 1028 " if `dynamic_pad=True`).") 1029def maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32, 1030 enqueue_many=False, shapes=None, dynamic_pad=False, 1031 allow_smaller_final_batch=False, shared_name=None, name=None): 1032 """Conditionally creates batches of tensors based on `keep_input`. 1033 1034 See docstring in `batch` for more details. 1035 1036 Args: 1037 tensors: The list or dictionary of tensors to enqueue. 1038 keep_input: A `bool` Tensor. This tensor controls whether the input is 1039 added to the queue or not. If it is a scalar and evaluates `True`, then 1040 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1041 is `True`, then each example is added to the queue only if the 1042 corresponding value in `keep_input` is `True`. This tensor essentially 1043 acts as a filtering mechanism. 1044 batch_size: The new batch size pulled from the queue. 1045 num_threads: The number of threads enqueuing `tensors`. The batching will 1046 be nondeterministic if `num_threads > 1`. 1047 capacity: An integer. The maximum number of elements in the queue. 1048 enqueue_many: Whether each tensor in `tensors` is a single example. 1049 shapes: (Optional) The shapes for each example. Defaults to the 1050 inferred shapes for `tensors`. 1051 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1052 The given dimensions are padded upon dequeue so that tensors within a 1053 batch have the same shapes. 1054 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1055 batch to be smaller if there are insufficient items left in the queue. 1056 shared_name: (Optional). If set, this queue will be shared under the given 1057 name across multiple sessions. 1058 name: (Optional) A name for the operations. 1059 1060 Returns: 1061 A list or dictionary of tensors with the same types as `tensors`. 1062 1063 Raises: 1064 ValueError: If the `shapes` are not specified, and cannot be 1065 inferred from the elements of `tensors`. 1066 """ 1067 return _batch( 1068 tensors, 1069 batch_size, 1070 keep_input, 1071 num_threads=num_threads, 1072 capacity=capacity, 1073 enqueue_many=enqueue_many, 1074 shapes=shapes, 1075 dynamic_pad=dynamic_pad, 1076 allow_smaller_final_batch=allow_smaller_final_batch, 1077 shared_name=shared_name, 1078 name=name) 1079 1080 1081@tf_export(v1=["train.batch_join"]) 1082@deprecation.deprecated( 1083 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1084 "`tf.data.Dataset.interleave(...).batch(batch_size)` (or " 1085 "`padded_batch(...)` if `dynamic_pad=True`).") 1086def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False, 1087 shapes=None, dynamic_pad=False, allow_smaller_final_batch=False, 1088 shared_name=None, name=None): 1089 """Runs a list of tensors to fill a queue to create batches of examples. 1090 1091 The `tensors_list` argument is a list of tuples of tensors, or a list of 1092 dictionaries of tensors. Each element in the list is treated similarly 1093 to the `tensors` argument of `tf.train.batch()`. 1094 1095 WARNING: This function is nondeterministic, since it starts a separate thread 1096 for each tensor. 1097 1098 Enqueues a different list of tensors in different threads. 1099 Implemented using a queue -- a `QueueRunner` for the queue 1100 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 1101 1102 `len(tensors_list)` threads will be started, 1103 with thread `i` enqueuing the tensors from 1104 `tensors_list[i]`. `tensors_list[i1][j]` must match 1105 `tensors_list[i2][j]` in type and shape, except in the first 1106 dimension if `enqueue_many` is true. 1107 1108 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed 1109 to represent a single example. An input tensor `x` will be output as a 1110 tensor with shape `[batch_size] + x.shape`. 1111 1112 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to 1113 represent a batch of examples, where the first dimension is indexed 1114 by example, and all members of `tensors_list[i]` should have the 1115 same size in the first dimension. The slices of any input tensor 1116 `x` are treated as examples, and the output tensors will have shape 1117 `[batch_size] + x.shape[1:]`. 1118 1119 The `capacity` argument controls the how long the prefetching is allowed to 1120 grow the queues. 1121 1122 The returned operation is a dequeue operation and will throw 1123 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1124 operation is feeding another input queue, its queue runner will catch 1125 this exception, however, if this operation is used in your main thread 1126 you are responsible for catching this yourself. 1127 1128 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either 1129 (i) the `shapes` argument is passed, or (ii) all of the tensors in 1130 `tensors_list` must have fully-defined shapes. `ValueError` will be 1131 raised if neither of these conditions holds. 1132 1133 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the 1134 tensors is known, but individual dimensions may have value `None`. 1135 In this case, for each enqueue the dimensions with value `None` 1136 may have a variable length; upon dequeue, the output tensors will be padded 1137 on the right to the maximum shape of the tensors in the current minibatch. 1138 For numbers, this padding takes value 0. For strings, this padding is 1139 the empty string. See `PaddingFIFOQueue` for more info. 1140 1141 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1142 `batch_size` is returned when the queue is closed and there are not enough 1143 elements to fill the batch, otherwise the pending elements are discarded. 1144 In addition, all output tensors' static shapes, as accessed via the 1145 `shape` property will have a first `Dimension` value of `None`, and 1146 operations that depend on fixed batch_size would fail. 1147 1148 Args: 1149 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1150 batch_size: An integer. The new batch size pulled from the queue. 1151 capacity: An integer. The maximum number of elements in the queue. 1152 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1153 example. 1154 shapes: (Optional) The shapes for each example. Defaults to the 1155 inferred shapes for `tensor_list_list[i]`. 1156 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1157 The given dimensions are padded upon dequeue so that tensors within a 1158 batch have the same shapes. 1159 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1160 batch to be smaller if there are insufficient items left in the queue. 1161 shared_name: (Optional) If set, this queue will be shared under the given 1162 name across multiple sessions. 1163 name: (Optional) A name for the operations. 1164 1165 Returns: 1166 A list or dictionary of tensors with the same number and types as 1167 `tensors_list[i]`. 1168 1169 Raises: 1170 ValueError: If the `shapes` are not specified, and cannot be 1171 inferred from the elements of `tensor_list_list`. 1172 1173 @compatibility(eager) 1174 Input pipelines based on Queues are not supported when eager execution is 1175 enabled. Please use the `tf.data` API to ingest data under eager execution. 1176 @end_compatibility 1177 """ 1178 return _batch_join( 1179 tensors_list, 1180 batch_size, 1181 keep_input=True, 1182 capacity=capacity, 1183 enqueue_many=enqueue_many, 1184 shapes=shapes, 1185 dynamic_pad=dynamic_pad, 1186 allow_smaller_final_batch=allow_smaller_final_batch, 1187 shared_name=shared_name, 1188 name=name) 1189 1190 1191@tf_export(v1=["train.maybe_batch_join"]) 1192@deprecation.deprecated( 1193 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1194 "`tf.data.Dataset.interleave(...).filter(...).batch(batch_size)` (or " 1195 "`padded_batch(...)` if `dynamic_pad=True`).") 1196def maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32, 1197 enqueue_many=False, shapes=None, dynamic_pad=False, 1198 allow_smaller_final_batch=False, shared_name=None, 1199 name=None): 1200 """Runs a list of tensors to conditionally fill a queue to create batches. 1201 1202 See docstring in `batch_join` for more details. 1203 1204 Args: 1205 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1206 keep_input: A `bool` Tensor. This tensor controls whether the input is 1207 added to the queue or not. If it is a scalar and evaluates `True`, then 1208 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1209 is `True`, then each example is added to the queue only if the 1210 corresponding value in `keep_input` is `True`. This tensor essentially 1211 acts as a filtering mechanism. 1212 batch_size: An integer. The new batch size pulled from the queue. 1213 capacity: An integer. The maximum number of elements in the queue. 1214 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1215 example. 1216 shapes: (Optional) The shapes for each example. Defaults to the 1217 inferred shapes for `tensor_list_list[i]`. 1218 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1219 The given dimensions are padded upon dequeue so that tensors within a 1220 batch have the same shapes. 1221 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1222 batch to be smaller if there are insufficient items left in the queue. 1223 shared_name: (Optional) If set, this queue will be shared under the given 1224 name across multiple sessions. 1225 name: (Optional) A name for the operations. 1226 1227 Returns: 1228 A list or dictionary of tensors with the same number and types as 1229 `tensors_list[i]`. 1230 1231 Raises: 1232 ValueError: If the `shapes` are not specified, and cannot be 1233 inferred from the elements of `tensor_list_list`. 1234 """ 1235 return _batch_join( 1236 tensors_list, 1237 batch_size, 1238 keep_input, 1239 capacity=capacity, 1240 enqueue_many=enqueue_many, 1241 shapes=shapes, 1242 dynamic_pad=dynamic_pad, 1243 allow_smaller_final_batch=allow_smaller_final_batch, 1244 shared_name=shared_name, 1245 name=name) 1246 1247 1248@tf_export(v1=["train.shuffle_batch"]) 1249@deprecation.deprecated( 1250 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1251 "`tf.data.Dataset.shuffle(min_after_dequeue).batch(batch_size)`.") 1252def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 1253 num_threads=1, seed=None, enqueue_many=False, shapes=None, 1254 allow_smaller_final_batch=False, shared_name=None, name=None): 1255 """Creates batches by randomly shuffling tensors. 1256 1257 This function adds the following to the current `Graph`: 1258 1259 * A shuffling queue into which tensors from `tensors` are enqueued. 1260 * A `dequeue_many` operation to create batches from the queue. 1261 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors 1262 from `tensors`. 1263 1264 If `enqueue_many` is `False`, `tensors` is assumed to represent a 1265 single example. An input tensor with shape `[x, y, z]` will be output 1266 as a tensor with shape `[batch_size, x, y, z]`. 1267 1268 If `enqueue_many` is `True`, `tensors` is assumed to represent a 1269 batch of examples, where the first dimension is indexed by example, 1270 and all members of `tensors` should have the same size in the 1271 first dimension. If an input tensor has shape `[*, x, y, z]`, the 1272 output will have shape `[batch_size, x, y, z]`. 1273 1274 The `capacity` argument controls the how long the prefetching is allowed to 1275 grow the queues. 1276 1277 The returned operation is a dequeue operation and will throw 1278 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1279 operation is feeding another input queue, its queue runner will catch 1280 this exception, however, if this operation is used in your main thread 1281 you are responsible for catching this yourself. 1282 1283 For example: 1284 1285 ```python 1286 # Creates batches of 32 images and 32 labels. 1287 image_batch, label_batch = tf.train.shuffle_batch( 1288 [single_image, single_label], 1289 batch_size=32, 1290 num_threads=4, 1291 capacity=50000, 1292 min_after_dequeue=10000) 1293 ``` 1294 1295 *N.B.:* You must ensure that either (i) the `shapes` argument is 1296 passed, or (ii) all of the tensors in `tensors` must have 1297 fully-defined shapes. `ValueError` will be raised if neither of 1298 these conditions holds. 1299 1300 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1301 `batch_size` is returned when the queue is closed and there are not enough 1302 elements to fill the batch, otherwise the pending elements are discarded. 1303 In addition, all output tensors' static shapes, as accessed via the 1304 `shape` property will have a first `Dimension` value of `None`, and 1305 operations that depend on fixed batch_size would fail. 1306 1307 Args: 1308 tensors: The list or dictionary of tensors to enqueue. 1309 batch_size: The new batch size pulled from the queue. 1310 capacity: An integer. The maximum number of elements in the queue. 1311 min_after_dequeue: Minimum number elements in the queue after a 1312 dequeue, used to ensure a level of mixing of elements. 1313 num_threads: The number of threads enqueuing `tensor_list`. 1314 seed: Seed for the random shuffling within the queue. 1315 enqueue_many: Whether each tensor in `tensor_list` is a single example. 1316 shapes: (Optional) The shapes for each example. Defaults to the 1317 inferred shapes for `tensor_list`. 1318 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1319 batch to be smaller if there are insufficient items left in the queue. 1320 shared_name: (Optional) If set, this queue will be shared under the given 1321 name across multiple sessions. 1322 name: (Optional) A name for the operations. 1323 1324 Returns: 1325 A list or dictionary of tensors with the types as `tensors`. 1326 1327 Raises: 1328 ValueError: If the `shapes` are not specified, and cannot be 1329 inferred from the elements of `tensors`. 1330 1331 @compatibility(eager) 1332 Input pipelines based on Queues are not supported when eager execution is 1333 enabled. Please use the `tf.data` API to ingest data under eager execution. 1334 @end_compatibility 1335 """ 1336 return _shuffle_batch( 1337 tensors, 1338 batch_size, 1339 capacity, 1340 min_after_dequeue, 1341 keep_input=True, 1342 num_threads=num_threads, 1343 seed=seed, 1344 enqueue_many=enqueue_many, 1345 shapes=shapes, 1346 allow_smaller_final_batch=allow_smaller_final_batch, 1347 shared_name=shared_name, 1348 name=name) 1349 1350 1351@tf_export(v1=["train.maybe_shuffle_batch"]) 1352@deprecation.deprecated( 1353 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1354 "`tf.data.Dataset.filter(...).shuffle(min_after_dequeue).batch(batch_size)`" 1355 ".") 1356def maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 1357 keep_input, num_threads=1, seed=None, 1358 enqueue_many=False, shapes=None, 1359 allow_smaller_final_batch=False, shared_name=None, 1360 name=None): 1361 """Creates batches by randomly shuffling conditionally-enqueued tensors. 1362 1363 See docstring in `shuffle_batch` for more details. 1364 1365 Args: 1366 tensors: The list or dictionary of tensors to enqueue. 1367 batch_size: The new batch size pulled from the queue. 1368 capacity: An integer. The maximum number of elements in the queue. 1369 min_after_dequeue: Minimum number elements in the queue after a 1370 dequeue, used to ensure a level of mixing of elements. 1371 keep_input: A `bool` Tensor. This tensor controls whether the input is 1372 added to the queue or not. If it is a scalar and evaluates `True`, then 1373 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1374 is `True`, then each example is added to the queue only if the 1375 corresponding value in `keep_input` is `True`. This tensor essentially 1376 acts as a filtering mechanism. 1377 num_threads: The number of threads enqueuing `tensor_list`. 1378 seed: Seed for the random shuffling within the queue. 1379 enqueue_many: Whether each tensor in `tensor_list` is a single example. 1380 shapes: (Optional) The shapes for each example. Defaults to the 1381 inferred shapes for `tensor_list`. 1382 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1383 batch to be smaller if there are insufficient items left in the queue. 1384 shared_name: (Optional) If set, this queue will be shared under the given 1385 name across multiple sessions. 1386 name: (Optional) A name for the operations. 1387 1388 Returns: 1389 A list or dictionary of tensors with the types as `tensors`. 1390 1391 Raises: 1392 ValueError: If the `shapes` are not specified, and cannot be 1393 inferred from the elements of `tensors`. 1394 1395 @compatibility(eager) 1396 Input pipelines based on Queues are not supported when eager execution is 1397 enabled. Please use the `tf.data` API to ingest data under eager execution. 1398 @end_compatibility 1399 """ 1400 return _shuffle_batch( 1401 tensors, 1402 batch_size, 1403 capacity, 1404 min_after_dequeue, 1405 keep_input, 1406 num_threads=num_threads, 1407 seed=seed, 1408 enqueue_many=enqueue_many, 1409 shapes=shapes, 1410 allow_smaller_final_batch=allow_smaller_final_batch, 1411 shared_name=shared_name, 1412 name=name) 1413 1414 1415@tf_export(v1=["train.shuffle_batch_join"]) 1416@deprecation.deprecated( 1417 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1418 "`tf.data.Dataset.interleave(...).shuffle(min_after_dequeue).batch" 1419 "(batch_size)`.") 1420def shuffle_batch_join(tensors_list, batch_size, capacity, 1421 min_after_dequeue, seed=None, enqueue_many=False, 1422 shapes=None, allow_smaller_final_batch=False, 1423 shared_name=None, name=None): 1424 """Create batches by randomly shuffling tensors. 1425 1426 The `tensors_list` argument is a list of tuples of tensors, or a list of 1427 dictionaries of tensors. Each element in the list is treated similarly 1428 to the `tensors` argument of `tf.train.shuffle_batch()`. 1429 1430 This version enqueues a different list of tensors in different threads. 1431 It adds the following to the current `Graph`: 1432 1433 * A shuffling queue into which tensors from `tensors_list` are enqueued. 1434 * A `dequeue_many` operation to create batches from the queue. 1435 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors 1436 from `tensors_list`. 1437 1438 `len(tensors_list)` threads will be started, with thread `i` enqueuing 1439 the tensors from `tensors_list[i]`. `tensors_list[i1][j]` must match 1440 `tensors_list[i2][j]` in type and shape, except in the first dimension if 1441 `enqueue_many` is true. 1442 1443 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed 1444 to represent a single example. An input tensor with shape `[x, y, z]` 1445 will be output as a tensor with shape `[batch_size, x, y, z]`. 1446 1447 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to 1448 represent a batch of examples, where the first dimension is indexed 1449 by example, and all members of `tensors_list[i]` should have the 1450 same size in the first dimension. If an input tensor has shape `[*, x, 1451 y, z]`, the output will have shape `[batch_size, x, y, z]`. 1452 1453 The `capacity` argument controls the how long the prefetching is allowed to 1454 grow the queues. 1455 1456 The returned operation is a dequeue operation and will throw 1457 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1458 operation is feeding another input queue, its queue runner will catch 1459 this exception, however, if this operation is used in your main thread 1460 you are responsible for catching this yourself. 1461 1462 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1463 `batch_size` is returned when the queue is closed and there are not enough 1464 elements to fill the batch, otherwise the pending elements are discarded. 1465 In addition, all output tensors' static shapes, as accessed via the 1466 `shape` property will have a first `Dimension` value of `None`, and 1467 operations that depend on fixed batch_size would fail. 1468 1469 Args: 1470 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1471 batch_size: An integer. The new batch size pulled from the queue. 1472 capacity: An integer. The maximum number of elements in the queue. 1473 min_after_dequeue: Minimum number elements in the queue after a 1474 dequeue, used to ensure a level of mixing of elements. 1475 seed: Seed for the random shuffling within the queue. 1476 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1477 example. 1478 shapes: (Optional) The shapes for each example. Defaults to the 1479 inferred shapes for `tensors_list[i]`. 1480 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1481 batch to be smaller if there are insufficient items left in the queue. 1482 shared_name: (optional). If set, this queue will be shared under the given 1483 name across multiple sessions. 1484 name: (Optional) A name for the operations. 1485 1486 Returns: 1487 A list or dictionary of tensors with the same number and types as 1488 `tensors_list[i]`. 1489 1490 Raises: 1491 ValueError: If the `shapes` are not specified, and cannot be 1492 inferred from the elements of `tensors_list`. 1493 1494 @compatibility(eager) 1495 Input pipelines based on Queues are not supported when eager execution is 1496 enabled. Please use the `tf.data` API to ingest data under eager execution. 1497 @end_compatibility 1498 """ 1499 return _shuffle_batch_join( 1500 tensors_list, 1501 batch_size, 1502 capacity, 1503 min_after_dequeue, 1504 keep_input=True, 1505 seed=seed, 1506 enqueue_many=enqueue_many, 1507 shapes=shapes, 1508 allow_smaller_final_batch=allow_smaller_final_batch, 1509 shared_name=shared_name, 1510 name=name) 1511 1512 1513@tf_export(v1=["train.maybe_shuffle_batch_join"]) 1514@deprecation.deprecated( 1515 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1516 "`tf.data.Dataset.interleave(...).filter(...).shuffle(min_after_dequeue)" 1517 ".batch(batch_size)`.") 1518def maybe_shuffle_batch_join(tensors_list, batch_size, capacity, 1519 min_after_dequeue, keep_input, seed=None, 1520 enqueue_many=False, shapes=None, 1521 allow_smaller_final_batch=False, shared_name=None, 1522 name=None): 1523 """Create batches by randomly shuffling conditionally-enqueued tensors. 1524 1525 See docstring in `shuffle_batch_join` for more details. 1526 1527 Args: 1528 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1529 batch_size: An integer. The new batch size pulled from the queue. 1530 capacity: An integer. The maximum number of elements in the queue. 1531 min_after_dequeue: Minimum number elements in the queue after a 1532 dequeue, used to ensure a level of mixing of elements. 1533 keep_input: A `bool` Tensor. This tensor controls whether the input is 1534 added to the queue or not. If it is a scalar and evaluates `True`, then 1535 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1536 is `True`, then each example is added to the queue only if the 1537 corresponding value in `keep_input` is `True`. This tensor essentially 1538 acts as a filtering mechanism. 1539 seed: Seed for the random shuffling within the queue. 1540 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1541 example. 1542 shapes: (Optional) The shapes for each example. Defaults to the 1543 inferred shapes for `tensors_list[i]`. 1544 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1545 batch to be smaller if there are insufficient items left in the queue. 1546 shared_name: (optional). If set, this queue will be shared under the given 1547 name across multiple sessions. 1548 name: (Optional) A name for the operations. 1549 1550 Returns: 1551 A list or dictionary of tensors with the same number and types as 1552 `tensors_list[i]`. 1553 1554 Raises: 1555 ValueError: If the `shapes` are not specified, and cannot be 1556 inferred from the elements of `tensors_list`. 1557 1558 @compatibility(eager) 1559 Input pipelines based on Queues are not supported when eager execution is 1560 enabled. Please use the `tf.data` API to ingest data under eager execution. 1561 @end_compatibility 1562 """ 1563 return _shuffle_batch_join( 1564 tensors_list, 1565 batch_size, 1566 capacity, 1567 min_after_dequeue, 1568 keep_input, 1569 seed=seed, 1570 enqueue_many=enqueue_many, 1571 shapes=shapes, 1572 allow_smaller_final_batch=allow_smaller_final_batch, 1573 shared_name=shared_name, 1574 name=name) 1575