1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15 16"""Input pipeline. 17 18Please see the [reading data 19how-to](https://tensorflow.org/api_guides/python/reading_data) 20for context. 21""" 22 23from __future__ import absolute_import 24from __future__ import division 25from __future__ import print_function 26 27from six.moves import xrange # pylint: disable=redefined-builtin 28 29from tensorflow.python.eager import context 30from tensorflow.python.framework import constant_op 31from tensorflow.python.framework import dtypes 32from tensorflow.python.framework import ops 33from tensorflow.python.framework import sparse_tensor 34from tensorflow.python.framework import tensor_shape 35from tensorflow.python.layers import utils 36from tensorflow.python.ops import array_ops 37from tensorflow.python.ops import control_flow_ops 38from tensorflow.python.ops import data_flow_ops 39from tensorflow.python.ops import io_ops 40from tensorflow.python.ops import math_ops 41from tensorflow.python.ops import random_ops 42from tensorflow.python.ops import sparse_ops 43from tensorflow.python.ops import variable_scope as vs 44from tensorflow.python.summary import summary 45from tensorflow.python.training import queue_runner 46from tensorflow.python.util import deprecation 47from tensorflow.python.util.compat import collections_abc 48from tensorflow.python.util.tf_export import tf_export 49 50 51# pylint: disable=protected-access 52_store_sparse = sparse_ops._add_sparse_to_tensors_map 53_store_many_sparse = sparse_ops._add_many_sparse_to_tensors_map 54_restore_sparse = sparse_ops._take_many_sparse_from_tensors_map 55# pylint: enable=protected-access 56 57 58@tf_export( 59 "io.match_filenames_once", 60 v1=["io.match_filenames_once", "train.match_filenames_once"]) 61@deprecation.deprecated_endpoints("train.match_filenames_once") 62def match_filenames_once(pattern, name=None): 63 """Save the list of files matching pattern, so it is only computed once. 64 65 NOTE: The order of the files returned is deterministic. 66 67 Args: 68 pattern: A file pattern (glob), or 1D tensor of file patterns. 69 name: A name for the operations (optional). 70 71 Returns: 72 A variable that is initialized to the list of files matching the pattern(s). 73 """ 74 with ops.name_scope(name, "matching_filenames", [pattern]) as name: 75 return vs.variable( 76 name=name, initial_value=io_ops.matching_files(pattern), 77 trainable=False, validate_shape=False, 78 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 79 80 81@tf_export(v1=["train.limit_epochs"]) 82@deprecation.deprecated( 83 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 84 "`tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.") 85def limit_epochs(tensor, num_epochs=None, name=None): 86 """Returns tensor `num_epochs` times and then raises an `OutOfRange` error. 87 88 Note: creates local counter `epochs`. Use `local_variables_initializer()` to 89 initialize local variables. 90 91 Args: 92 tensor: Any `Tensor`. 93 num_epochs: A positive integer (optional). If specified, limits the number 94 of steps the output tensor may be evaluated. 95 name: A name for the operations (optional). 96 97 Returns: 98 tensor or `OutOfRange`. 99 100 Raises: 101 ValueError: if `num_epochs` is invalid. 102 """ 103 if num_epochs is None: 104 return tensor 105 if num_epochs <= 0: 106 raise ValueError("num_epochs must be > 0 not %d." % num_epochs) 107 with ops.name_scope(name, "limit_epochs", [tensor]) as name: 108 zero64 = constant_op.constant(0, dtype=dtypes.int64) 109 epochs = vs.variable( 110 zero64, name="epochs", trainable=False, 111 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 112 counter = epochs.count_up_to(num_epochs) 113 with ops.control_dependencies([counter]): 114 return array_ops.identity(tensor, name=name) 115 116 117@tf_export(v1=["train.input_producer"]) 118@deprecation.deprecated( 119 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 120 "`tf.data.Dataset.from_tensor_slices(input_tensor).shuffle" 121 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 122 "`shuffle=False`, omit the `.shuffle(...)`.") 123def input_producer(input_tensor, 124 element_shape=None, 125 num_epochs=None, 126 shuffle=True, 127 seed=None, 128 capacity=32, 129 shared_name=None, 130 summary_name=None, 131 name=None, 132 cancel_op=None): 133 """Output the rows of `input_tensor` to a queue for an input pipeline. 134 135 Note: if `num_epochs` is not `None`, this function creates local counter 136 `epochs`. Use `local_variables_initializer()` to initialize local variables. 137 138 Args: 139 input_tensor: A tensor with the rows to produce. Must be at least 140 one-dimensional. Must either have a fully-defined shape, or 141 `element_shape` must be defined. 142 element_shape: (Optional.) A `TensorShape` representing the shape of a 143 row of `input_tensor`, if it cannot be inferred. 144 num_epochs: (Optional.) An integer. If specified `input_producer` produces 145 each row of `input_tensor` `num_epochs` times before generating an 146 `OutOfRange` error. If not specified, `input_producer` can cycle through 147 the rows of `input_tensor` an unlimited number of times. 148 shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled 149 within each epoch. 150 seed: (Optional.) An integer. The seed to use if `shuffle` is true. 151 capacity: (Optional.) The capacity of the queue to be used for buffering 152 the input. 153 shared_name: (Optional.) If set, this queue will be shared under the given 154 name across multiple sessions. 155 summary_name: (Optional.) If set, a scalar summary for the current queue 156 size will be generated, using this name as part of the tag. 157 name: (Optional.) A name for queue. 158 cancel_op: (Optional.) Cancel op for the queue 159 160 Returns: 161 A queue with the output rows. A `QueueRunner` for the queue is 162 added to the current `QUEUE_RUNNER` collection of the current 163 graph. 164 165 Raises: 166 ValueError: If the shape of the input cannot be inferred from the arguments. 167 RuntimeError: If called with eager execution enabled. 168 169 @compatibility(eager) 170 Input pipelines based on Queues are not supported when eager execution is 171 enabled. Please use the `tf.data` API to ingest data under eager execution. 172 @end_compatibility 173 """ 174 if context.executing_eagerly(): 175 raise RuntimeError( 176 "Input pipelines based on Queues are not supported when eager execution" 177 " is enabled. Please use tf.data to ingest data into your model" 178 " instead.") 179 with ops.name_scope(name, "input_producer", [input_tensor]): 180 input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor") 181 element_shape = input_tensor.shape[1:].merge_with(element_shape) 182 if not element_shape.is_fully_defined(): 183 raise ValueError("Either `input_tensor` must have a fully defined shape " 184 "or `element_shape` must be specified") 185 186 if shuffle: 187 input_tensor = random_ops.random_shuffle(input_tensor, seed=seed) 188 189 input_tensor = limit_epochs(input_tensor, num_epochs) 190 191 q = data_flow_ops.FIFOQueue(capacity=capacity, 192 dtypes=[input_tensor.dtype.base_dtype], 193 shapes=[element_shape], 194 shared_name=shared_name, name=name) 195 enq = q.enqueue_many([input_tensor]) 196 queue_runner.add_queue_runner( 197 queue_runner.QueueRunner( 198 q, [enq], cancel_op=cancel_op)) 199 if summary_name is not None: 200 summary.scalar(summary_name, 201 math_ops.cast(q.size(), dtypes.float32) * (1. / capacity)) 202 return q 203 204 205@tf_export(v1=["train.string_input_producer"]) 206@deprecation.deprecated( 207 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 208 "`tf.data.Dataset.from_tensor_slices(string_tensor).shuffle" 209 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 210 "`shuffle=False`, omit the `.shuffle(...)`.") 211def string_input_producer(string_tensor, 212 num_epochs=None, 213 shuffle=True, 214 seed=None, 215 capacity=32, 216 shared_name=None, 217 name=None, 218 cancel_op=None): 219 """Output strings (e.g. filenames) to a queue for an input pipeline. 220 221 Note: if `num_epochs` is not `None`, this function creates local counter 222 `epochs`. Use `local_variables_initializer()` to initialize local variables. 223 224 Args: 225 string_tensor: A 1-D string tensor with the strings to produce. 226 num_epochs: An integer (optional). If specified, `string_input_producer` 227 produces each string from `string_tensor` `num_epochs` times before 228 generating an `OutOfRange` error. If not specified, 229 `string_input_producer` can cycle through the strings in `string_tensor` 230 an unlimited number of times. 231 shuffle: Boolean. If true, the strings are randomly shuffled within each 232 epoch. 233 seed: An integer (optional). Seed used if shuffle == True. 234 capacity: An integer. Sets the queue capacity. 235 shared_name: (optional). If set, this queue will be shared under the given 236 name across multiple sessions. All sessions open to the device which has 237 this queue will be able to access it via the shared_name. Using this in 238 a distributed setting means each name will only be seen by one of the 239 sessions which has access to this operation. 240 name: A name for the operations (optional). 241 cancel_op: Cancel op for the queue (optional). 242 243 Returns: 244 A queue with the output strings. A `QueueRunner` for the Queue 245 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 246 247 Raises: 248 ValueError: If the string_tensor is a null Python list. At runtime, 249 will fail with an assertion if string_tensor becomes a null tensor. 250 251 @compatibility(eager) 252 Input pipelines based on Queues are not supported when eager execution is 253 enabled. Please use the `tf.data` API to ingest data under eager execution. 254 @end_compatibility 255 """ 256 not_null_err = "string_input_producer requires a non-null input tensor" 257 if not isinstance(string_tensor, ops.Tensor) and not string_tensor: 258 raise ValueError(not_null_err) 259 260 with ops.name_scope(name, "input_producer", [string_tensor]) as name: 261 string_tensor = ops.convert_to_tensor(string_tensor, dtype=dtypes.string) 262 with ops.control_dependencies([ 263 control_flow_ops.Assert( 264 math_ops.greater(array_ops.size(string_tensor), 0), 265 [not_null_err])]): 266 string_tensor = array_ops.identity(string_tensor) 267 return input_producer( 268 input_tensor=string_tensor, 269 element_shape=[], 270 num_epochs=num_epochs, 271 shuffle=shuffle, 272 seed=seed, 273 capacity=capacity, 274 shared_name=shared_name, 275 name=name, 276 summary_name="fraction_of_%d_full" % capacity, 277 cancel_op=cancel_op) 278 279 280@tf_export(v1=["train.range_input_producer"]) 281@deprecation.deprecated( 282 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 283 "`tf.data.Dataset.range(limit).shuffle(limit).repeat(num_epochs)`. If " 284 "`shuffle=False`, omit the `.shuffle(...)`.") 285def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None, 286 capacity=32, shared_name=None, name=None): 287 """Produces the integers from 0 to limit-1 in a queue. 288 289 Note: if `num_epochs` is not `None`, this function creates local counter 290 `epochs`. Use `local_variables_initializer()` to initialize local variables. 291 292 Args: 293 limit: An int32 scalar tensor. 294 num_epochs: An integer (optional). If specified, `range_input_producer` 295 produces each integer `num_epochs` times before generating an 296 OutOfRange error. If not specified, `range_input_producer` can cycle 297 through the integers an unlimited number of times. 298 shuffle: Boolean. If true, the integers are randomly shuffled within each 299 epoch. 300 seed: An integer (optional). Seed used if shuffle == True. 301 capacity: An integer. Sets the queue capacity. 302 shared_name: (optional). If set, this queue will be shared under the given 303 name across multiple sessions. 304 name: A name for the operations (optional). 305 306 Returns: 307 A Queue with the output integers. A `QueueRunner` for the Queue 308 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 309 310 @compatibility(eager) 311 Input pipelines based on Queues are not supported when eager execution is 312 enabled. Please use the `tf.data` API to ingest data under eager execution. 313 @end_compatibility 314 """ 315 with ops.name_scope(name, "input_producer", [limit]) as name: 316 range_tensor = math_ops.range(limit) 317 return input_producer( 318 range_tensor, [], num_epochs, shuffle, seed, capacity, 319 shared_name, "fraction_of_%d_full" % capacity, name) 320 321 322@tf_export(v1=["train.slice_input_producer"]) 323@deprecation.deprecated( 324 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 325 "`tf.data.Dataset.from_tensor_slices(tuple(tensor_list)).shuffle" 326 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 327 "`shuffle=False`, omit the `.shuffle(...)`.") 328def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None, 329 capacity=32, shared_name=None, name=None): 330 """Produces a slice of each `Tensor` in `tensor_list`. 331 332 Implemented using a Queue -- a `QueueRunner` for the Queue 333 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 334 335 Args: 336 tensor_list: A list of `Tensor` objects. Every `Tensor` in 337 `tensor_list` must have the same size in the first dimension. 338 num_epochs: An integer (optional). If specified, `slice_input_producer` 339 produces each slice `num_epochs` times before generating 340 an `OutOfRange` error. If not specified, `slice_input_producer` can cycle 341 through the slices an unlimited number of times. 342 shuffle: Boolean. If true, the integers are randomly shuffled within each 343 epoch. 344 seed: An integer (optional). Seed used if shuffle == True. 345 capacity: An integer. Sets the queue capacity. 346 shared_name: (optional). If set, this queue will be shared under the given 347 name across multiple sessions. 348 name: A name for the operations (optional). 349 350 Returns: 351 A list of tensors, one for each element of `tensor_list`. If the tensor 352 in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output 353 tensor will have shape `[a, b, ..., z]`. 354 355 Raises: 356 ValueError: if `slice_input_producer` produces nothing from `tensor_list`. 357 358 @compatibility(eager) 359 Input pipelines based on Queues are not supported when eager execution is 360 enabled. Please use the `tf.data` API to ingest data under eager execution. 361 @end_compatibility 362 """ 363 with ops.name_scope(name, "input_producer", tensor_list): 364 tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) 365 if not tensor_list: 366 raise ValueError( 367 "Expected at least one tensor in slice_input_producer().") 368 range_size = array_ops.shape(tensor_list[0])[0] 369 # TODO(josh11b): Add an assertion that the first dimension of 370 # everything in TensorList matches. Maybe just check the inferred shapes? 371 queue = range_input_producer(range_size, num_epochs=num_epochs, 372 shuffle=shuffle, seed=seed, capacity=capacity, 373 shared_name=shared_name) 374 index = queue.dequeue() 375 output = [array_ops.gather(t, index) for t in tensor_list] 376 return output 377 378 379# Helpers for the batching functions ------------------------------------------ 380 381 382def _flatten(tensor_list_list): 383 return [tensor for tensor_list in tensor_list_list for tensor in tensor_list] 384 385 386class _SparseMetaData(object): 387 """Store information about the Tensor: Is it sparse?, map_op, and rank.""" 388 389 def __init__(self, sparse, map_op, rank): 390 """Create the metadata. 391 392 Args: 393 sparse: Python boolean. 394 map_op: The `Operation` that created the `SparseTensorsMap` in question. 395 This Op contains information about the underlying Map object and the 396 dtype of the original data. 397 rank: The statically known rank of the `SparseTensor`. 398 """ 399 self._sparse = sparse 400 self._map_op = map_op 401 self._rank = tensor_shape.as_dimension(rank) 402 403 def __eq__(self, other): 404 if self.sparse != other.sparse: 405 return False 406 if not self.sparse: 407 return True 408 # If map_ops are not the same, the data source is not the same. 409 if (self.map_op is not None) != (other.map_op is not None): 410 return False 411 if self.map_op != other.map_op: 412 return False 413 if not self.rank.is_compatible_with(other.rank): 414 return False 415 return True 416 417 def __ne__(self, other): 418 return not self.__eq__(other) 419 420 def __str__(self): 421 return "[SparseMetaData(%s, %s, %s)]" % (self.sparse, self.map_op.name, 422 self.rank) 423 424 def merge_with(self, other): 425 if self != other: 426 raise ValueError("SparseMetaData objects are incompatible: %s vs. %s" 427 % (self, other)) 428 if self.sparse: 429 self.rank.merge_with(other.rank) 430 return self 431 432 @property 433 def map_op(self): 434 return self._map_op 435 436 @property 437 def sparse(self): 438 return self._sparse 439 440 @property 441 def rank(self): 442 return self._rank 443 444 445def _as_tensor_list(tensors): 446 if isinstance(tensors, dict): 447 return [tensors[k] for k in sorted(tensors, key=str)] 448 else: 449 return tensors 450 451 452def _as_tensor_list_list(tensors_list): 453 if not tensors_list: 454 raise ValueError("Expected at least one set of tensors") 455 if isinstance(tensors_list[0], dict): 456 expected_keys = set(tensors_list[0].keys()) 457 for tensors in tensors_list[1:]: 458 if set(tensors.keys()) != expected_keys: 459 raise ValueError("All dictionaries in tensors_list must have " 460 "the same keys") 461 return [_as_tensor_list(tensors) for tensors in tensors_list] 462 else: 463 return tensors_list 464 465 466def _as_original_type(original_tensors, tensor_list): 467 if isinstance(original_tensors, dict): 468 if len(original_tensors) == 1: 469 # tensor_list is bogusly returned as a single tensor if only one tensor 470 # was enqueued. Make it a list again. See b/28117485. 471 tensor_list = [tensor_list] 472 return {k: tensor_list[i] 473 for i, k in enumerate(sorted(original_tensors, key=str))} 474 else: 475 return tensor_list 476 477 478def _store_sparse_tensors(tensor_list, enqueue_many, keep_input, 479 shared_map_ops=None): 480 """Store SparseTensors for feeding into batch, etc. 481 482 If `shared_map_ops` is provided, the underlying `SparseTensorsMap` objects 483 are reused (shared). This argument is useful for, e.g., `batch_join` 484 where multiple enqueue operations write to the same Queue component, 485 and another (dequeue) thread reads from that same location and must then 486 restore the associated `SparseTensor` objects. In this case, the sparse 487 restore must have a single `SparseTensorMap` from which to read out the 488 handles; so a single `SparseTensorMap` must be shared for storing 489 across the multiple enqueue operations. This sharing is performed by 490 calling `_store_sparse_tensors` the first time with `shared_map_ops=None`, 491 and then in subsequent times with this value set to the list of `Operation` 492 objects created in the first call. 493 494 Args: 495 tensor_list: List of `Tensor` and `SparseTensor` objects. 496 enqueue_many: Python `Boolean`. 497 keep_input: Must be a scalar bool Tensor (not a Python bool). If False, 498 don't store. 499 shared_map_ops: (optional) List of `Operation` objects from a previous 500 call to `_store_sparse_tensors`. If not `None`, the op types should be 501 one of `AddSparseToTensorsMap` or `AddManySparseToTensorsMap` in the 502 locations corresponding to `SparseTensors` in `tensor_list`. 503 504 Returns: 505 A tuple `(stored_list, sparse_info_list)` where `stored_list` is a list 506 of `Tensor` objects (same length as `tensor_list`) and `sparse_info_list` 507 is a list of the same length of `_SparseMetaData` objects. 508 """ 509 maybe_shared_map_ops = shared_map_ops or [None] * len(tensor_list) 510 511 def _sparse_meta_data(t, storing_op, map_op): 512 if not isinstance(t, sparse_tensor.SparseTensor): 513 return _SparseMetaData(False, None, None) 514 rank = t.dense_shape.shape.with_rank(1).dims[0] 515 if enqueue_many: 516 rank -= 1 517 # If a shared map_op was provided, use that. Otherwise use the name of 518 # the operation used to store the SparseTensor. 519 return _SparseMetaData( 520 sparse=True, map_op=map_op or storing_op, rank=rank) 521 522 def _maybe_store(t, shared_map_op): 523 """Store Sparse tensor, if necessary.""" 524 if not isinstance(t, sparse_tensor.SparseTensor): 525 return t 526 map_op_name = shared_map_op.name if shared_map_op else None 527 def _maybe_store_sparse(t, map_op_name, keep_input): 528 """Conditionally store a single sparse Tensor.""" 529 return utils.smart_cond( 530 keep_input, 531 lambda: _store_sparse(t, shared_name=map_op_name), 532 lambda: constant_op.constant(-1, dtypes.int64)) 533 def _maybe_store_many_sparse(t, map_op_name, keep_input): 534 """Conditionally store multiple sparse Tensors.""" 535 out_tensor = utils.smart_cond( 536 keep_input, 537 lambda: _store_many_sparse(t, shared_name=map_op_name), 538 lambda: -1 * array_ops.ones(array_ops.shape(t)[0:1], dtypes.int64)) 539 out_tensor.set_shape([None]) # necessary when t.ndims is unknown 540 return out_tensor 541 def _sparse_values_to_keep(t, keep_input): 542 """Convert a per-row `keep_input` vector to a per-value one.""" 543 # Get the rows of every value in the sparse Tensor. 544 row_values = t.indices[:, 0] 545 # The value should be kept iff the row should be kept. 546 return array_ops.gather(keep_input, row_values) 547 if keep_input.shape.ndims == 1: 548 t = sparse_ops.sparse_retain(t, _sparse_values_to_keep(t, keep_input)) 549 store_f = lambda t, name, _: _store_many_sparse(t, shared_name=name) 550 elif enqueue_many: 551 store_f = _maybe_store_many_sparse 552 else: 553 store_f = _maybe_store_sparse 554 return store_f(t, map_op_name, keep_input) 555 556 stored_list = [ 557 _maybe_store(t, shared_map_op) for t, shared_map_op 558 in zip(tensor_list, maybe_shared_map_ops)] 559 # Since the output of `_store{_many}_sparse is wrapped in a tf.cond `Merge`, 560 # we can't just get the Op of the resulting tensor. 561 def _sparse_op(stored): 562 for input_tensor in stored.op.inputs: 563 if input_tensor.op.type in ("AddSparseToTensorsMap", 564 "AddManySparseToTensorsMap"): 565 return input_tensor.op 566 # If there was no sparse input, then the original stored Tensor wasn't 567 # sparse and we can just return the original Tensor's Op. 568 return stored.op 569 sparse_info_list = [ 570 _sparse_meta_data(t, _sparse_op(stored), shared_map_op) 571 for t, stored, shared_map_op 572 in zip(tensor_list, stored_list, maybe_shared_map_ops)] 573 # Expand dims of stored tensors by 1 for proper enqueue shape 574 stored_list = [ 575 array_ops.expand_dims(s, [-1]) if s_info.sparse else s 576 for s, s_info in zip(stored_list, sparse_info_list)] 577 return stored_list, sparse_info_list 578 579 580def _store_sparse_tensors_join(tensor_list_list, enqueue_many, keep_input): 581 """Store SparseTensors for feeding into batch_join, etc.""" 582 (s0, sparse_info_list) = _store_sparse_tensors( 583 tensor_list_list[0], enqueue_many, keep_input) 584 stored_list_list = [s0] 585 for tensor_list in tensor_list_list[1:]: 586 s, sparse_info_candidate = _store_sparse_tensors( 587 tensor_list, enqueue_many, keep_input, 588 [st.map_op for st in sparse_info_list]) 589 if sparse_info_list != sparse_info_candidate: 590 raise ValueError("Inconsistent SparseTensors list: %s vs. %s" 591 % (tensor_list_list[0], tensor_list)) 592 sparse_info_list = [ 593 info.merge_with(candidate) 594 for (info, candidate) in zip(sparse_info_list, sparse_info_candidate)] 595 stored_list_list.append(s) 596 597 return (stored_list_list, sparse_info_list) 598 599 600def _restore_sparse_tensors(stored_list, sparse_info_list): 601 """Restore SparseTensors after dequeue in batch, batch_join, etc.""" 602 received_sequence = isinstance(stored_list, collections_abc.Sequence) 603 if not received_sequence: 604 stored_list = (stored_list,) 605 tensors = [ 606 _restore_sparse(sparse_map_op=info.map_op, 607 sparse_handles=array_ops.squeeze(s, [1]), 608 rank=tensor_shape.dimension_value(info.rank + 1)) 609 if info.sparse else s 610 for (s, info) in zip(stored_list, sparse_info_list)] 611 has_st = any(isinstance(x, sparse_tensor.SparseTensor) for x in tensors) 612 if has_st: 613 t_values = [ 614 x.values if isinstance(x, sparse_tensor.SparseTensor) 615 else x 616 for x in tensors] 617 with_deps = lambda x: control_flow_ops.with_dependencies(t_values, x) 618 ensure_restore_tensors = [ 619 sparse_tensor.SparseTensor(indices=with_deps(x.indices), 620 values=with_deps(x.values), 621 dense_shape=with_deps(x.dense_shape)) 622 if isinstance(x, sparse_tensor.SparseTensor) 623 else with_deps(x) 624 for x in tensors] 625 else: 626 ensure_restore_tensors = tensors 627 return ensure_restore_tensors if received_sequence else tensors[0] 628 629 630def _validate(tensor_list): 631 tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) 632 if not tensor_list: 633 raise ValueError("Expected at least one tensor in batch().") 634 return tensor_list 635 636 637def _validate_join(tensor_list_list): 638 tensor_list_list = [ops.convert_n_to_tensor_or_indexed_slices(tl) 639 for tl in tensor_list_list] 640 if not tensor_list_list: 641 raise ValueError("Expected at least one input in batch_join().") 642 return tensor_list_list 643 644 645def _validate_keep_input(keep_input, enqueue_many): 646 """Validate `keep_input` argument to conditional batching functions.""" 647 keep_input = ops.convert_to_tensor(keep_input) 648 if keep_input.shape.ndims is None: 649 raise ValueError( 650 "`keep_input` dimensions must be known at graph construction.") 651 if not enqueue_many and keep_input.shape.ndims == 1: 652 raise ValueError( 653 "`keep_input` cannot be a vector when `enqueue_many=False`.") 654 if keep_input.shape.ndims > 1: 655 raise ValueError("`keep_input` must be 0 or 1 dimensions.") 656 return keep_input 657 658 659def _dtypes(tensor_list_list): 660 all_types = [[t.dtype for t in tl] for tl in tensor_list_list] 661 types = all_types[0] 662 for other_types in all_types[1:]: 663 if other_types != types: 664 raise TypeError("Expected types to be consistent: %s vs. %s." % 665 (", ".join(x.name for x in types), 666 ", ".join(x.name for x in other_types))) 667 return types 668 669 670def _merge_shapes(shape_list, enqueue_many): 671 shape_list = [tensor_shape.as_shape(s) for s in shape_list] 672 if enqueue_many: 673 # We want the shapes without the leading batch dimension. 674 shape_list = [s.with_rank_at_least(1)[1:] for s in shape_list] 675 merged_shape = shape_list[0] 676 for s in shape_list[1:]: 677 merged_shape.merge_with(s) 678 return merged_shape.as_list() 679 680 681def _shapes(tensor_list_list, shapes, enqueue_many): 682 """Calculate and merge the shapes of incoming tensors. 683 684 Args: 685 tensor_list_list: List of tensor lists. 686 shapes: List of shape tuples corresponding to tensors within the lists. 687 enqueue_many: Boolean describing whether shapes will be enqueued as 688 batches or individual entries. 689 690 Returns: 691 A list of shapes aggregating shape inference info from `tensor_list_list`, 692 or returning `shapes` if it is not `None`. 693 694 Raises: 695 ValueError: If any of the inferred shapes in `tensor_list_list` lack a 696 well defined rank. 697 """ 698 if shapes is None: 699 len0 = len(tensor_list_list[0]) 700 701 for tl in tensor_list_list: 702 for i in xrange(len0): 703 if tl[i].shape.ndims is None: 704 raise ValueError("Cannot infer Tensor's rank: %s" % tl[i]) 705 706 shapes = [_merge_shapes( 707 [tl[i].shape.as_list() for tl in tensor_list_list], enqueue_many) 708 for i in xrange(len0)] 709 return shapes 710 711 712def _select_which_to_enqueue(tensor_list, keep_input): 713 """Select which examples to enqueue based on vector `keep_input`.""" 714 select_i = math_ops.cast(keep_input, dtypes.int32) 715 tensor_list = [ 716 data_flow_ops.dynamic_partition(x, select_i, num_partitions=2)[1] 717 for x in tensor_list] 718 return tensor_list 719 720 721def _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input): 722 """Enqueue `tensor_list_list` in `queue`.""" 723 if enqueue_many: 724 enqueue_fn = queue.enqueue_many 725 else: 726 enqueue_fn = queue.enqueue 727 if keep_input.shape.ndims == 1: 728 enqueue_ops = [enqueue_fn(_select_which_to_enqueue(x, keep_input)) 729 for x in tensor_list_list] 730 else: 731 enqueue_ops = [utils.smart_cond( 732 keep_input, 733 lambda: enqueue_fn(tl), # pylint:disable=cell-var-from-loop 734 control_flow_ops.no_op) for tl in tensor_list_list] 735 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) 736 737 738def _enqueue(queue, tensor_list, threads, enqueue_many, keep_input): 739 """Enqueue `tensor_list` in `queue`.""" 740 if enqueue_many: 741 enqueue_fn = queue.enqueue_many 742 else: 743 enqueue_fn = queue.enqueue 744 if keep_input.shape.ndims == 1: 745 enqueue_ops = [ 746 enqueue_fn(_select_which_to_enqueue(tensor_list, keep_input))] * threads 747 else: 748 enqueue_ops = [utils.smart_cond( 749 keep_input, 750 lambda: enqueue_fn(tensor_list), 751 control_flow_ops.no_op)] * threads 752 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) 753 754 755def _which_queue(dynamic_pad): 756 return (data_flow_ops.PaddingFIFOQueue if dynamic_pad 757 else data_flow_ops.FIFOQueue) 758 759 760def _batch(tensors, batch_size, keep_input, num_threads=1, capacity=32, 761 enqueue_many=False, shapes=None, dynamic_pad=False, 762 allow_smaller_final_batch=False, shared_name=None, 763 name=None): 764 """Helper function for `batch` and `maybe_batch`.""" 765 if context.executing_eagerly(): 766 raise ValueError( 767 "Input pipelines based on Queues are not supported when eager execution" 768 " is enabled. Please use tf.data to ingest data into your model" 769 " instead.") 770 tensor_list = _as_tensor_list(tensors) 771 with ops.name_scope(name, "batch", list(tensor_list) + [keep_input]) as name: 772 tensor_list = _validate(tensor_list) 773 keep_input = _validate_keep_input(keep_input, enqueue_many) 774 (tensor_list, sparse_info) = _store_sparse_tensors( 775 tensor_list, enqueue_many, keep_input) 776 types = _dtypes([tensor_list]) 777 shapes = _shapes([tensor_list], shapes, enqueue_many) 778 # TODO(josh11b,mrry): Switch to BatchQueue once it is written. 779 queue = _which_queue(dynamic_pad)( 780 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name) 781 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input) 782 summary.scalar( 783 "fraction_of_%d_full" % capacity, 784 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity)) 785 786 if allow_smaller_final_batch: 787 dequeued = queue.dequeue_up_to(batch_size, name=name) 788 else: 789 dequeued = queue.dequeue_many(batch_size, name=name) 790 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 791 return _as_original_type(tensors, dequeued) 792 793 794# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be 795# a multiple of len(tensor_list_list)?) parameter, to address the use 796# case where you want more parallelism than you can support different 797# readers (either because you don't have that many files or can't 798# read that many files in parallel due to the number of seeks required). 799# Once this is done, batch() can be written as a call to batch_join(). 800def _batch_join(tensors_list, batch_size, keep_input, capacity=32, 801 enqueue_many=False, shapes=None, dynamic_pad=False, 802 allow_smaller_final_batch=False, shared_name=None, name=None): 803 """Helper function for `batch_join` and `maybe_batch_join`.""" 804 if context.executing_eagerly(): 805 raise ValueError( 806 "Input pipelines based on Queues are not supported when eager execution" 807 " is enabled. Please use tf.data to ingest data into your model" 808 " instead.") 809 tensor_list_list = _as_tensor_list_list(tensors_list) 810 with ops.name_scope(name, "batch_join", 811 _flatten(tensor_list_list) + [keep_input]) as name: 812 tensor_list_list = _validate_join(tensor_list_list) 813 keep_input = _validate_keep_input(keep_input, enqueue_many) 814 tensor_list_list, sparse_info = _store_sparse_tensors_join( 815 tensor_list_list, enqueue_many, keep_input) 816 types = _dtypes(tensor_list_list) 817 shapes = _shapes(tensor_list_list, shapes, enqueue_many) 818 # TODO(josh11b,mrry): Switch to BatchQueue once it is written. 819 queue = _which_queue(dynamic_pad)( 820 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name) 821 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input) 822 summary.scalar( 823 "fraction_of_%d_full" % capacity, 824 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity)) 825 826 if allow_smaller_final_batch: 827 dequeued = queue.dequeue_up_to(batch_size, name=name) 828 else: 829 dequeued = queue.dequeue_many(batch_size, name=name) 830 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 831 # tensors_list was validated to not be empty. 832 return _as_original_type(tensors_list[0], dequeued) 833 834 835def _shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 836 keep_input, num_threads=1, seed=None, enqueue_many=False, 837 shapes=None, allow_smaller_final_batch=False, 838 shared_name=None, name=None): 839 """Helper function for `shuffle_batch` and `maybe_shuffle_batch`.""" 840 if context.executing_eagerly(): 841 raise ValueError( 842 "Input pipelines based on Queues are not supported when eager execution" 843 " is enabled. Please use tf.data to ingest data into your model" 844 " instead.") 845 tensor_list = _as_tensor_list(tensors) 846 with ops.name_scope(name, "shuffle_batch", 847 list(tensor_list) + [keep_input]) as name: 848 if capacity <= min_after_dequeue: 849 raise ValueError("capacity %d must be bigger than min_after_dequeue %d." 850 % (capacity, min_after_dequeue)) 851 tensor_list = _validate(tensor_list) 852 keep_input = _validate_keep_input(keep_input, enqueue_many) 853 tensor_list, sparse_info = _store_sparse_tensors( 854 tensor_list, enqueue_many, keep_input) 855 types = _dtypes([tensor_list]) 856 shapes = _shapes([tensor_list], shapes, enqueue_many) 857 queue = data_flow_ops.RandomShuffleQueue( 858 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, 859 dtypes=types, shapes=shapes, shared_name=shared_name) 860 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input) 861 full = (math_ops.cast( 862 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) * 863 (1. / (capacity - min_after_dequeue))) 864 # Note that name contains a '/' at the end so we intentionally do not place 865 # a '/' after %s below. 866 summary_name = ( 867 "fraction_over_%d_of_%d_full" % 868 (min_after_dequeue, capacity - min_after_dequeue)) 869 summary.scalar(summary_name, full) 870 871 if allow_smaller_final_batch: 872 dequeued = queue.dequeue_up_to(batch_size, name=name) 873 else: 874 dequeued = queue.dequeue_many(batch_size, name=name) 875 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 876 return _as_original_type(tensors, dequeued) 877 878 879def _shuffle_batch_join(tensors_list, batch_size, capacity, 880 min_after_dequeue, keep_input, seed=None, 881 enqueue_many=False, shapes=None, 882 allow_smaller_final_batch=False, shared_name=None, 883 name=None): 884 """Helper function for `shuffle_batch_join` and `maybe_shuffle_batch_join`.""" 885 if context.executing_eagerly(): 886 raise ValueError( 887 "Input pipelines based on Queues are not supported when eager execution" 888 " is enabled. Please use tf.data to ingest data into your model" 889 " instead.") 890 tensor_list_list = _as_tensor_list_list(tensors_list) 891 with ops.name_scope(name, "shuffle_batch_join", 892 _flatten(tensor_list_list) + [keep_input]) as name: 893 tensor_list_list = _validate_join(tensor_list_list) 894 keep_input = _validate_keep_input(keep_input, enqueue_many) 895 tensor_list_list, sparse_info = _store_sparse_tensors_join( 896 tensor_list_list, enqueue_many, keep_input) 897 types = _dtypes(tensor_list_list) 898 shapes = _shapes(tensor_list_list, shapes, enqueue_many) 899 queue = data_flow_ops.RandomShuffleQueue( 900 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, 901 dtypes=types, shapes=shapes, shared_name=shared_name) 902 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input) 903 full = (math_ops.cast( 904 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) * 905 (1. / (capacity - min_after_dequeue))) 906 # Note that name contains a '/' at the end so we intentionally do not place 907 # a '/' after %s below. 908 summary_name = ( 909 "fraction_over_%d_of_%d_full" % 910 (min_after_dequeue, capacity - min_after_dequeue)) 911 summary.scalar(summary_name, full) 912 913 if allow_smaller_final_batch: 914 dequeued = queue.dequeue_up_to(batch_size, name=name) 915 else: 916 dequeued = queue.dequeue_many(batch_size, name=name) 917 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 918 # tensors_list was validated to not be empty. 919 return _as_original_type(tensors_list[0], dequeued) 920 921# Batching functions ---------------------------------------------------------- 922 923 924@tf_export(v1=["train.batch"]) 925@deprecation.deprecated( 926 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 927 "`tf.data.Dataset.batch(batch_size)` (or `padded_batch(...)` if " 928 "`dynamic_pad=True`).") 929def batch(tensors, batch_size, num_threads=1, capacity=32, 930 enqueue_many=False, shapes=None, dynamic_pad=False, 931 allow_smaller_final_batch=False, shared_name=None, name=None): 932 """Creates batches of tensors in `tensors`. 933 934 The argument `tensors` can be a list or a dictionary of tensors. 935 The value returned by the function will be of the same type 936 as `tensors`. 937 938 This function is implemented using a queue. A `QueueRunner` for the 939 queue is added to the current `Graph`'s `QUEUE_RUNNER` collection. 940 941 If `enqueue_many` is `False`, `tensors` is assumed to represent a single 942 example. An input tensor with shape `[x, y, z]` will be output as a tensor 943 with shape `[batch_size, x, y, z]`. 944 945 If `enqueue_many` is `True`, `tensors` is assumed to represent a batch of 946 examples, where the first dimension is indexed by example, and all members of 947 `tensors` should have the same size in the first dimension. If an input 948 tensor has shape `[*, x, y, z]`, the output will have shape `[batch_size, x, 949 y, z]`. The `capacity` argument controls the how long the prefetching is 950 allowed to grow the queues. 951 952 The returned operation is a dequeue operation and will throw 953 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 954 operation is feeding another input queue, its queue runner will catch 955 this exception, however, if this operation is used in your main thread 956 you are responsible for catching this yourself. 957 958 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either 959 (i) the `shapes` argument is passed, or (ii) all of the tensors in 960 `tensors` must have fully-defined shapes. `ValueError` will be 961 raised if neither of these conditions holds. 962 963 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the 964 tensors is known, but individual dimensions may have shape `None`. 965 In this case, for each enqueue the dimensions with value `None` 966 may have a variable length; upon dequeue, the output tensors will be padded 967 on the right to the maximum shape of the tensors in the current minibatch. 968 For numbers, this padding takes value 0. For strings, this padding is 969 the empty string. See `PaddingFIFOQueue` for more info. 970 971 If `allow_smaller_final_batch` is `True`, a smaller batch value than 972 `batch_size` is returned when the queue is closed and there are not enough 973 elements to fill the batch, otherwise the pending elements are discarded. 974 In addition, all output tensors' static shapes, as accessed via the 975 `shape` property will have a first `Dimension` value of `None`, and 976 operations that depend on fixed batch_size would fail. 977 978 Args: 979 tensors: The list or dictionary of tensors to enqueue. 980 batch_size: The new batch size pulled from the queue. 981 num_threads: The number of threads enqueuing `tensors`. The batching will 982 be nondeterministic if `num_threads > 1`. 983 capacity: An integer. The maximum number of elements in the queue. 984 enqueue_many: Whether each tensor in `tensors` is a single example. 985 shapes: (Optional) The shapes for each example. Defaults to the 986 inferred shapes for `tensors`. 987 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 988 The given dimensions are padded upon dequeue so that tensors within a 989 batch have the same shapes. 990 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 991 batch to be smaller if there are insufficient items left in the queue. 992 shared_name: (Optional). If set, this queue will be shared under the given 993 name across multiple sessions. 994 name: (Optional) A name for the operations. 995 996 Returns: 997 A list or dictionary of tensors with the same types as `tensors` (except if 998 the input is a list of one element, then it returns a tensor, not a list). 999 1000 Raises: 1001 ValueError: If the `shapes` are not specified, and cannot be 1002 inferred from the elements of `tensors`. 1003 1004 @compatibility(eager) 1005 Input pipelines based on Queues are not supported when eager execution is 1006 enabled. Please use the `tf.data` API to ingest data under eager execution. 1007 @end_compatibility 1008 """ 1009 return _batch( 1010 tensors, 1011 batch_size, 1012 keep_input=True, 1013 num_threads=num_threads, 1014 capacity=capacity, 1015 enqueue_many=enqueue_many, 1016 shapes=shapes, 1017 dynamic_pad=dynamic_pad, 1018 allow_smaller_final_batch=allow_smaller_final_batch, 1019 shared_name=shared_name, 1020 name=name) 1021 1022 1023@tf_export(v1=["train.maybe_batch"]) 1024@deprecation.deprecated( 1025 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1026 "`tf.data.Dataset.filter(...).batch(batch_size)` (or `padded_batch(...)`" 1027 " if `dynamic_pad=True`).") 1028def maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32, 1029 enqueue_many=False, shapes=None, dynamic_pad=False, 1030 allow_smaller_final_batch=False, shared_name=None, name=None): 1031 """Conditionally creates batches of tensors based on `keep_input`. 1032 1033 See docstring in `batch` for more details. 1034 1035 Args: 1036 tensors: The list or dictionary of tensors to enqueue. 1037 keep_input: A `bool` Tensor. This tensor controls whether the input is 1038 added to the queue or not. If it is a scalar and evaluates `True`, then 1039 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1040 is `True`, then each example is added to the queue only if the 1041 corresponding value in `keep_input` is `True`. This tensor essentially 1042 acts as a filtering mechanism. 1043 batch_size: The new batch size pulled from the queue. 1044 num_threads: The number of threads enqueuing `tensors`. The batching will 1045 be nondeterministic if `num_threads > 1`. 1046 capacity: An integer. The maximum number of elements in the queue. 1047 enqueue_many: Whether each tensor in `tensors` is a single example. 1048 shapes: (Optional) The shapes for each example. Defaults to the 1049 inferred shapes for `tensors`. 1050 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1051 The given dimensions are padded upon dequeue so that tensors within a 1052 batch have the same shapes. 1053 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1054 batch to be smaller if there are insufficient items left in the queue. 1055 shared_name: (Optional). If set, this queue will be shared under the given 1056 name across multiple sessions. 1057 name: (Optional) A name for the operations. 1058 1059 Returns: 1060 A list or dictionary of tensors with the same types as `tensors`. 1061 1062 Raises: 1063 ValueError: If the `shapes` are not specified, and cannot be 1064 inferred from the elements of `tensors`. 1065 """ 1066 return _batch( 1067 tensors, 1068 batch_size, 1069 keep_input, 1070 num_threads=num_threads, 1071 capacity=capacity, 1072 enqueue_many=enqueue_many, 1073 shapes=shapes, 1074 dynamic_pad=dynamic_pad, 1075 allow_smaller_final_batch=allow_smaller_final_batch, 1076 shared_name=shared_name, 1077 name=name) 1078 1079 1080@tf_export(v1=["train.batch_join"]) 1081@deprecation.deprecated( 1082 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1083 "`tf.data.Dataset.interleave(...).batch(batch_size)` (or " 1084 "`padded_batch(...)` if `dynamic_pad=True`).") 1085def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False, 1086 shapes=None, dynamic_pad=False, allow_smaller_final_batch=False, 1087 shared_name=None, name=None): 1088 """Runs a list of tensors to fill a queue to create batches of examples. 1089 1090 The `tensors_list` argument is a list of tuples of tensors, or a list of 1091 dictionaries of tensors. Each element in the list is treated similarly 1092 to the `tensors` argument of `tf.compat.v1.train.batch()`. 1093 1094 WARNING: This function is nondeterministic, since it starts a separate thread 1095 for each tensor. 1096 1097 Enqueues a different list of tensors in different threads. 1098 Implemented using a queue -- a `QueueRunner` for the queue 1099 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 1100 1101 `len(tensors_list)` threads will be started, 1102 with thread `i` enqueuing the tensors from 1103 `tensors_list[i]`. `tensors_list[i1][j]` must match 1104 `tensors_list[i2][j]` in type and shape, except in the first 1105 dimension if `enqueue_many` is true. 1106 1107 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed 1108 to represent a single example. An input tensor `x` will be output as a 1109 tensor with shape `[batch_size] + x.shape`. 1110 1111 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to 1112 represent a batch of examples, where the first dimension is indexed 1113 by example, and all members of `tensors_list[i]` should have the 1114 same size in the first dimension. The slices of any input tensor 1115 `x` are treated as examples, and the output tensors will have shape 1116 `[batch_size] + x.shape[1:]`. 1117 1118 The `capacity` argument controls the how long the prefetching is allowed to 1119 grow the queues. 1120 1121 The returned operation is a dequeue operation and will throw 1122 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1123 operation is feeding another input queue, its queue runner will catch 1124 this exception, however, if this operation is used in your main thread 1125 you are responsible for catching this yourself. 1126 1127 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either 1128 (i) the `shapes` argument is passed, or (ii) all of the tensors in 1129 `tensors_list` must have fully-defined shapes. `ValueError` will be 1130 raised if neither of these conditions holds. 1131 1132 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the 1133 tensors is known, but individual dimensions may have value `None`. 1134 In this case, for each enqueue the dimensions with value `None` 1135 may have a variable length; upon dequeue, the output tensors will be padded 1136 on the right to the maximum shape of the tensors in the current minibatch. 1137 For numbers, this padding takes value 0. For strings, this padding is 1138 the empty string. See `PaddingFIFOQueue` for more info. 1139 1140 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1141 `batch_size` is returned when the queue is closed and there are not enough 1142 elements to fill the batch, otherwise the pending elements are discarded. 1143 In addition, all output tensors' static shapes, as accessed via the 1144 `shape` property will have a first `Dimension` value of `None`, and 1145 operations that depend on fixed batch_size would fail. 1146 1147 Args: 1148 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1149 batch_size: An integer. The new batch size pulled from the queue. 1150 capacity: An integer. The maximum number of elements in the queue. 1151 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1152 example. 1153 shapes: (Optional) The shapes for each example. Defaults to the 1154 inferred shapes for `tensor_list_list[i]`. 1155 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1156 The given dimensions are padded upon dequeue so that tensors within a 1157 batch have the same shapes. 1158 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1159 batch to be smaller if there are insufficient items left in the queue. 1160 shared_name: (Optional) If set, this queue will be shared under the given 1161 name across multiple sessions. 1162 name: (Optional) A name for the operations. 1163 1164 Returns: 1165 A list or dictionary of tensors with the same number and types as 1166 `tensors_list[i]`. 1167 1168 Raises: 1169 ValueError: If the `shapes` are not specified, and cannot be 1170 inferred from the elements of `tensor_list_list`. 1171 1172 @compatibility(eager) 1173 Input pipelines based on Queues are not supported when eager execution is 1174 enabled. Please use the `tf.data` API to ingest data under eager execution. 1175 @end_compatibility 1176 """ 1177 return _batch_join( 1178 tensors_list, 1179 batch_size, 1180 keep_input=True, 1181 capacity=capacity, 1182 enqueue_many=enqueue_many, 1183 shapes=shapes, 1184 dynamic_pad=dynamic_pad, 1185 allow_smaller_final_batch=allow_smaller_final_batch, 1186 shared_name=shared_name, 1187 name=name) 1188 1189 1190@tf_export(v1=["train.maybe_batch_join"]) 1191@deprecation.deprecated( 1192 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1193 "`tf.data.Dataset.interleave(...).filter(...).batch(batch_size)` (or " 1194 "`padded_batch(...)` if `dynamic_pad=True`).") 1195def maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32, 1196 enqueue_many=False, shapes=None, dynamic_pad=False, 1197 allow_smaller_final_batch=False, shared_name=None, 1198 name=None): 1199 """Runs a list of tensors to conditionally fill a queue to create batches. 1200 1201 See docstring in `batch_join` for more details. 1202 1203 Args: 1204 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1205 keep_input: A `bool` Tensor. This tensor controls whether the input is 1206 added to the queue or not. If it is a scalar and evaluates `True`, then 1207 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1208 is `True`, then each example is added to the queue only if the 1209 corresponding value in `keep_input` is `True`. This tensor essentially 1210 acts as a filtering mechanism. 1211 batch_size: An integer. The new batch size pulled from the queue. 1212 capacity: An integer. The maximum number of elements in the queue. 1213 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1214 example. 1215 shapes: (Optional) The shapes for each example. Defaults to the 1216 inferred shapes for `tensor_list_list[i]`. 1217 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1218 The given dimensions are padded upon dequeue so that tensors within a 1219 batch have the same shapes. 1220 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1221 batch to be smaller if there are insufficient items left in the queue. 1222 shared_name: (Optional) If set, this queue will be shared under the given 1223 name across multiple sessions. 1224 name: (Optional) A name for the operations. 1225 1226 Returns: 1227 A list or dictionary of tensors with the same number and types as 1228 `tensors_list[i]`. 1229 1230 Raises: 1231 ValueError: If the `shapes` are not specified, and cannot be 1232 inferred from the elements of `tensor_list_list`. 1233 """ 1234 return _batch_join( 1235 tensors_list, 1236 batch_size, 1237 keep_input, 1238 capacity=capacity, 1239 enqueue_many=enqueue_many, 1240 shapes=shapes, 1241 dynamic_pad=dynamic_pad, 1242 allow_smaller_final_batch=allow_smaller_final_batch, 1243 shared_name=shared_name, 1244 name=name) 1245 1246 1247@tf_export(v1=["train.shuffle_batch"]) 1248@deprecation.deprecated( 1249 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1250 "`tf.data.Dataset.shuffle(min_after_dequeue).batch(batch_size)`.") 1251def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 1252 num_threads=1, seed=None, enqueue_many=False, shapes=None, 1253 allow_smaller_final_batch=False, shared_name=None, name=None): 1254 """Creates batches by randomly shuffling tensors. 1255 1256 This function adds the following to the current `Graph`: 1257 1258 * A shuffling queue into which tensors from `tensors` are enqueued. 1259 * A `dequeue_many` operation to create batches from the queue. 1260 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors 1261 from `tensors`. 1262 1263 If `enqueue_many` is `False`, `tensors` is assumed to represent a 1264 single example. An input tensor with shape `[x, y, z]` will be output 1265 as a tensor with shape `[batch_size, x, y, z]`. 1266 1267 If `enqueue_many` is `True`, `tensors` is assumed to represent a 1268 batch of examples, where the first dimension is indexed by example, 1269 and all members of `tensors` should have the same size in the 1270 first dimension. If an input tensor has shape `[*, x, y, z]`, the 1271 output will have shape `[batch_size, x, y, z]`. 1272 1273 The `capacity` argument controls the how long the prefetching is allowed to 1274 grow the queues. 1275 1276 The returned operation is a dequeue operation and will throw 1277 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1278 operation is feeding another input queue, its queue runner will catch 1279 this exception, however, if this operation is used in your main thread 1280 you are responsible for catching this yourself. 1281 1282 For example: 1283 1284 ```python 1285 # Creates batches of 32 images and 32 labels. 1286 image_batch, label_batch = tf.compat.v1.train.shuffle_batch( 1287 [single_image, single_label], 1288 batch_size=32, 1289 num_threads=4, 1290 capacity=50000, 1291 min_after_dequeue=10000) 1292 ``` 1293 1294 *N.B.:* You must ensure that either (i) the `shapes` argument is 1295 passed, or (ii) all of the tensors in `tensors` must have 1296 fully-defined shapes. `ValueError` will be raised if neither of 1297 these conditions holds. 1298 1299 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1300 `batch_size` is returned when the queue is closed and there are not enough 1301 elements to fill the batch, otherwise the pending elements are discarded. 1302 In addition, all output tensors' static shapes, as accessed via the 1303 `shape` property will have a first `Dimension` value of `None`, and 1304 operations that depend on fixed batch_size would fail. 1305 1306 Args: 1307 tensors: The list or dictionary of tensors to enqueue. 1308 batch_size: The new batch size pulled from the queue. 1309 capacity: An integer. The maximum number of elements in the queue. 1310 min_after_dequeue: Minimum number elements in the queue after a 1311 dequeue, used to ensure a level of mixing of elements. 1312 num_threads: The number of threads enqueuing `tensor_list`. 1313 seed: Seed for the random shuffling within the queue. 1314 enqueue_many: Whether each tensor in `tensor_list` is a single example. 1315 shapes: (Optional) The shapes for each example. Defaults to the 1316 inferred shapes for `tensor_list`. 1317 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1318 batch to be smaller if there are insufficient items left in the queue. 1319 shared_name: (Optional) If set, this queue will be shared under the given 1320 name across multiple sessions. 1321 name: (Optional) A name for the operations. 1322 1323 Returns: 1324 A list or dictionary of tensors with the types as `tensors`. 1325 1326 Raises: 1327 ValueError: If the `shapes` are not specified, and cannot be 1328 inferred from the elements of `tensors`. 1329 1330 @compatibility(eager) 1331 Input pipelines based on Queues are not supported when eager execution is 1332 enabled. Please use the `tf.data` API to ingest data under eager execution. 1333 @end_compatibility 1334 """ 1335 return _shuffle_batch( 1336 tensors, 1337 batch_size, 1338 capacity, 1339 min_after_dequeue, 1340 keep_input=True, 1341 num_threads=num_threads, 1342 seed=seed, 1343 enqueue_many=enqueue_many, 1344 shapes=shapes, 1345 allow_smaller_final_batch=allow_smaller_final_batch, 1346 shared_name=shared_name, 1347 name=name) 1348 1349 1350@tf_export(v1=["train.maybe_shuffle_batch"]) 1351@deprecation.deprecated( 1352 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1353 "`tf.data.Dataset.filter(...).shuffle(min_after_dequeue).batch(batch_size)`" 1354 ".") 1355def maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 1356 keep_input, num_threads=1, seed=None, 1357 enqueue_many=False, shapes=None, 1358 allow_smaller_final_batch=False, shared_name=None, 1359 name=None): 1360 """Creates batches by randomly shuffling conditionally-enqueued tensors. 1361 1362 See docstring in `shuffle_batch` for more details. 1363 1364 Args: 1365 tensors: The list or dictionary of tensors to enqueue. 1366 batch_size: The new batch size pulled from the queue. 1367 capacity: An integer. The maximum number of elements in the queue. 1368 min_after_dequeue: Minimum number elements in the queue after a 1369 dequeue, used to ensure a level of mixing of elements. 1370 keep_input: A `bool` Tensor. This tensor controls whether the input is 1371 added to the queue or not. If it is a scalar and evaluates `True`, then 1372 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1373 is `True`, then each example is added to the queue only if the 1374 corresponding value in `keep_input` is `True`. This tensor essentially 1375 acts as a filtering mechanism. 1376 num_threads: The number of threads enqueuing `tensor_list`. 1377 seed: Seed for the random shuffling within the queue. 1378 enqueue_many: Whether each tensor in `tensor_list` is a single example. 1379 shapes: (Optional) The shapes for each example. Defaults to the 1380 inferred shapes for `tensor_list`. 1381 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1382 batch to be smaller if there are insufficient items left in the queue. 1383 shared_name: (Optional) If set, this queue will be shared under the given 1384 name across multiple sessions. 1385 name: (Optional) A name for the operations. 1386 1387 Returns: 1388 A list or dictionary of tensors with the types as `tensors`. 1389 1390 Raises: 1391 ValueError: If the `shapes` are not specified, and cannot be 1392 inferred from the elements of `tensors`. 1393 1394 @compatibility(eager) 1395 Input pipelines based on Queues are not supported when eager execution is 1396 enabled. Please use the `tf.data` API to ingest data under eager execution. 1397 @end_compatibility 1398 """ 1399 return _shuffle_batch( 1400 tensors, 1401 batch_size, 1402 capacity, 1403 min_after_dequeue, 1404 keep_input, 1405 num_threads=num_threads, 1406 seed=seed, 1407 enqueue_many=enqueue_many, 1408 shapes=shapes, 1409 allow_smaller_final_batch=allow_smaller_final_batch, 1410 shared_name=shared_name, 1411 name=name) 1412 1413 1414@tf_export(v1=["train.shuffle_batch_join"]) 1415@deprecation.deprecated( 1416 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1417 "`tf.data.Dataset.interleave(...).shuffle(min_after_dequeue).batch" 1418 "(batch_size)`.") 1419def shuffle_batch_join(tensors_list, batch_size, capacity, 1420 min_after_dequeue, seed=None, enqueue_many=False, 1421 shapes=None, allow_smaller_final_batch=False, 1422 shared_name=None, name=None): 1423 """Create batches by randomly shuffling tensors. 1424 1425 The `tensors_list` argument is a list of tuples of tensors, or a list of 1426 dictionaries of tensors. Each element in the list is treated similarly 1427 to the `tensors` argument of `tf.compat.v1.train.shuffle_batch()`. 1428 1429 This version enqueues a different list of tensors in different threads. 1430 It adds the following to the current `Graph`: 1431 1432 * A shuffling queue into which tensors from `tensors_list` are enqueued. 1433 * A `dequeue_many` operation to create batches from the queue. 1434 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors 1435 from `tensors_list`. 1436 1437 `len(tensors_list)` threads will be started, with thread `i` enqueuing 1438 the tensors from `tensors_list[i]`. `tensors_list[i1][j]` must match 1439 `tensors_list[i2][j]` in type and shape, except in the first dimension if 1440 `enqueue_many` is true. 1441 1442 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed 1443 to represent a single example. An input tensor with shape `[x, y, z]` 1444 will be output as a tensor with shape `[batch_size, x, y, z]`. 1445 1446 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to 1447 represent a batch of examples, where the first dimension is indexed 1448 by example, and all members of `tensors_list[i]` should have the 1449 same size in the first dimension. If an input tensor has shape `[*, x, 1450 y, z]`, the output will have shape `[batch_size, x, y, z]`. 1451 1452 The `capacity` argument controls the how long the prefetching is allowed to 1453 grow the queues. 1454 1455 The returned operation is a dequeue operation and will throw 1456 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1457 operation is feeding another input queue, its queue runner will catch 1458 this exception, however, if this operation is used in your main thread 1459 you are responsible for catching this yourself. 1460 1461 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1462 `batch_size` is returned when the queue is closed and there are not enough 1463 elements to fill the batch, otherwise the pending elements are discarded. 1464 In addition, all output tensors' static shapes, as accessed via the 1465 `shape` property will have a first `Dimension` value of `None`, and 1466 operations that depend on fixed batch_size would fail. 1467 1468 Args: 1469 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1470 batch_size: An integer. The new batch size pulled from the queue. 1471 capacity: An integer. The maximum number of elements in the queue. 1472 min_after_dequeue: Minimum number elements in the queue after a 1473 dequeue, used to ensure a level of mixing of elements. 1474 seed: Seed for the random shuffling within the queue. 1475 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1476 example. 1477 shapes: (Optional) The shapes for each example. Defaults to the 1478 inferred shapes for `tensors_list[i]`. 1479 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1480 batch to be smaller if there are insufficient items left in the queue. 1481 shared_name: (optional). If set, this queue will be shared under the given 1482 name across multiple sessions. 1483 name: (Optional) A name for the operations. 1484 1485 Returns: 1486 A list or dictionary of tensors with the same number and types as 1487 `tensors_list[i]`. 1488 1489 Raises: 1490 ValueError: If the `shapes` are not specified, and cannot be 1491 inferred from the elements of `tensors_list`. 1492 1493 @compatibility(eager) 1494 Input pipelines based on Queues are not supported when eager execution is 1495 enabled. Please use the `tf.data` API to ingest data under eager execution. 1496 @end_compatibility 1497 """ 1498 return _shuffle_batch_join( 1499 tensors_list, 1500 batch_size, 1501 capacity, 1502 min_after_dequeue, 1503 keep_input=True, 1504 seed=seed, 1505 enqueue_many=enqueue_many, 1506 shapes=shapes, 1507 allow_smaller_final_batch=allow_smaller_final_batch, 1508 shared_name=shared_name, 1509 name=name) 1510 1511 1512@tf_export(v1=["train.maybe_shuffle_batch_join"]) 1513@deprecation.deprecated( 1514 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1515 "`tf.data.Dataset.interleave(...).filter(...).shuffle(min_after_dequeue)" 1516 ".batch(batch_size)`.") 1517def maybe_shuffle_batch_join(tensors_list, batch_size, capacity, 1518 min_after_dequeue, keep_input, seed=None, 1519 enqueue_many=False, shapes=None, 1520 allow_smaller_final_batch=False, shared_name=None, 1521 name=None): 1522 """Create batches by randomly shuffling conditionally-enqueued tensors. 1523 1524 See docstring in `shuffle_batch_join` for more details. 1525 1526 Args: 1527 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1528 batch_size: An integer. The new batch size pulled from the queue. 1529 capacity: An integer. The maximum number of elements in the queue. 1530 min_after_dequeue: Minimum number elements in the queue after a 1531 dequeue, used to ensure a level of mixing of elements. 1532 keep_input: A `bool` Tensor. This tensor controls whether the input is 1533 added to the queue or not. If it is a scalar and evaluates `True`, then 1534 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1535 is `True`, then each example is added to the queue only if the 1536 corresponding value in `keep_input` is `True`. This tensor essentially 1537 acts as a filtering mechanism. 1538 seed: Seed for the random shuffling within the queue. 1539 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1540 example. 1541 shapes: (Optional) The shapes for each example. Defaults to the 1542 inferred shapes for `tensors_list[i]`. 1543 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1544 batch to be smaller if there are insufficient items left in the queue. 1545 shared_name: (optional). If set, this queue will be shared under the given 1546 name across multiple sessions. 1547 name: (Optional) A name for the operations. 1548 1549 Returns: 1550 A list or dictionary of tensors with the same number and types as 1551 `tensors_list[i]`. 1552 1553 Raises: 1554 ValueError: If the `shapes` are not specified, and cannot be 1555 inferred from the elements of `tensors_list`. 1556 1557 @compatibility(eager) 1558 Input pipelines based on Queues are not supported when eager execution is 1559 enabled. Please use the `tf.data` API to ingest data under eager execution. 1560 @end_compatibility 1561 """ 1562 return _shuffle_batch_join( 1563 tensors_list, 1564 batch_size, 1565 capacity, 1566 min_after_dequeue, 1567 keep_input, 1568 seed=seed, 1569 enqueue_many=enqueue_many, 1570 shapes=shapes, 1571 allow_smaller_final_batch=allow_smaller_final_batch, 1572 shared_name=shared_name, 1573 name=name) 1574