1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Methods to read data in the graph (deprecated). 16 17This module and all its submodules are deprecated. See 18[contrib/learn/README.md](https://www.tensorflow.org/code/tensorflow/contrib/learn/README.md) 19for migration instructions. 20""" 21 22from __future__ import absolute_import 23from __future__ import division 24from __future__ import print_function 25 26from tensorflow.contrib.input_pipeline.python.ops import input_pipeline_ops 27from tensorflow.python.framework import constant_op 28from tensorflow.python.framework import dtypes 29from tensorflow.python.framework import errors 30from tensorflow.python.framework import ops 31from tensorflow.python.framework import sparse_tensor 32from tensorflow.python.layers import utils 33from tensorflow.python.ops import array_ops 34from tensorflow.python.ops import data_flow_ops 35from tensorflow.python.ops import io_ops 36from tensorflow.python.ops import math_ops 37from tensorflow.python.ops import parsing_ops 38from tensorflow.python.platform import gfile 39from tensorflow.python.summary import summary 40from tensorflow.python.training import input as input_ops 41from tensorflow.python.training import queue_runner 42from tensorflow.python.util.deprecation import deprecated 43 44# Default name for key in the feature dict. 45KEY_FEATURE_NAME = '__key__' 46 47 48@deprecated(None, 'Use tf.data.') 49def read_batch_examples(file_pattern, 50 batch_size, 51 reader, 52 randomize_input=True, 53 num_epochs=None, 54 queue_capacity=10000, 55 num_threads=1, 56 read_batch_size=1, 57 parse_fn=None, 58 name=None, 59 seed=None): 60 """Adds operations to read, queue, batch `Example` protos. 61 62 Given file pattern (or list of files), will setup a queue for file names, 63 read `Example` proto using provided `reader`, use batch queue to create 64 batches of examples of size `batch_size`. 65 66 All queue runners are added to the queue runners collection, and may be 67 started via `start_queue_runners`. 68 69 All ops are added to the default graph. 70 71 Use `parse_fn` if you need to do parsing / processing on single examples. 72 73 Args: 74 file_pattern: List of files or patterns of file paths containing 75 `Example` records. See `tf.gfile.Glob` for pattern rules. 76 batch_size: An int or scalar `Tensor` specifying the batch size to use. 77 reader: A function or class that returns an object with 78 `read` method, (filename tensor) -> (example tensor). 79 randomize_input: Whether the input should be randomized. 80 num_epochs: Integer specifying the number of times to read through the 81 dataset. If `None`, cycles through the dataset forever. 82 NOTE - If specified, creates a variable that must be initialized, so call 83 `tf.local_variables_initializer()` and run the op in a session. 84 queue_capacity: Capacity for input queue. 85 num_threads: The number of threads enqueuing examples. In order to have 86 predictable and repeatable order of reading and enqueueing, such as in 87 prediction and evaluation mode, `num_threads` should be 1. 88 read_batch_size: An int or scalar `Tensor` specifying the number of 89 records to read at once. 90 parse_fn: Parsing function, takes `Example` Tensor returns parsed 91 representation. If `None`, no parsing is done. 92 name: Name of resulting op. 93 seed: An integer (optional). Seed used if randomize_input == True. 94 95 Returns: 96 String `Tensor` of batched `Example` proto. 97 98 Raises: 99 ValueError: for invalid inputs. 100 """ 101 _, examples = read_keyed_batch_examples( 102 file_pattern=file_pattern, 103 batch_size=batch_size, 104 reader=reader, 105 randomize_input=randomize_input, 106 num_epochs=num_epochs, 107 queue_capacity=queue_capacity, 108 num_threads=num_threads, 109 read_batch_size=read_batch_size, 110 parse_fn=parse_fn, 111 name=name, 112 seed=seed) 113 return examples 114 115 116@deprecated(None, 'Use tf.data.') 117def read_keyed_batch_examples(file_pattern, 118 batch_size, 119 reader, 120 randomize_input=True, 121 num_epochs=None, 122 queue_capacity=10000, 123 num_threads=1, 124 read_batch_size=1, 125 parse_fn=None, 126 name=None, 127 seed=None): 128 """Adds operations to read, queue, batch `Example` protos. 129 130 Given file pattern (or list of files), will setup a queue for file names, 131 read `Example` proto using provided `reader`, use batch queue to create 132 batches of examples of size `batch_size`. 133 134 All queue runners are added to the queue runners collection, and may be 135 started via `start_queue_runners`. 136 137 All ops are added to the default graph. 138 139 Use `parse_fn` if you need to do parsing / processing on single examples. 140 141 Args: 142 file_pattern: List of files or patterns of file paths containing 143 `Example` records. See `tf.gfile.Glob` for pattern rules. 144 batch_size: An int or scalar `Tensor` specifying the batch size to use. 145 reader: A function or class that returns an object with 146 `read` method, (filename tensor) -> (example tensor). 147 randomize_input: Whether the input should be randomized. 148 num_epochs: Integer specifying the number of times to read through the 149 dataset. If `None`, cycles through the dataset forever. 150 NOTE - If specified, creates a variable that must be initialized, so call 151 `tf.local_variables_initializer()` and run the op in a session. 152 queue_capacity: Capacity for input queue. 153 num_threads: The number of threads enqueuing examples. In order to have 154 predictable and repeatable order of reading and enqueueing, such as in 155 prediction and evaluation mode, `num_threads` should be 1. 156 read_batch_size: An int or scalar `Tensor` specifying the number of 157 records to read at once. 158 parse_fn: Parsing function, takes `Example` Tensor returns parsed 159 representation. If `None`, no parsing is done. 160 name: Name of resulting op. 161 seed: An integer (optional). Seed used if randomize_input == True. 162 163 Returns: 164 Returns tuple of: 165 - `Tensor` of string keys. 166 - String `Tensor` of batched `Example` proto. 167 168 Raises: 169 ValueError: for invalid inputs. 170 """ 171 return _read_keyed_batch_examples_helper( 172 file_pattern, 173 batch_size, 174 reader, 175 randomize_input=randomize_input, 176 num_epochs=num_epochs, 177 queue_capacity=queue_capacity, 178 num_threads=num_threads, 179 read_batch_size=read_batch_size, 180 parse_fn=parse_fn, 181 setup_shared_queue=False, 182 name=name, 183 seed=seed) 184 185 186@deprecated(None, 'Use tf.data.') 187def read_keyed_batch_examples_shared_queue(file_pattern, 188 batch_size, 189 reader, 190 randomize_input=True, 191 num_epochs=None, 192 queue_capacity=10000, 193 num_threads=1, 194 read_batch_size=1, 195 parse_fn=None, 196 name=None, 197 seed=None): 198 """Adds operations to read, queue, batch `Example` protos. 199 200 Given file pattern (or list of files), will setup a shared queue for file 201 names, setup a worker queue that pulls from the shared queue, read `Example` 202 protos using provided `reader`, use batch queue to create batches of examples 203 of size `batch_size`. This provides at most once visit guarantees. Note that 204 this only works if the parameter servers are not pre-empted or restarted or 205 the session is not restored from a checkpoint since the state of a queue 206 is not checkpointed and we will end up restarting from the entire list of 207 files. 208 209 All queue runners are added to the queue runners collection, and may be 210 started via `start_queue_runners`. 211 212 All ops are added to the default graph. 213 214 Use `parse_fn` if you need to do parsing / processing on single examples. 215 216 Args: 217 file_pattern: List of files or patterns of file paths containing 218 `Example` records. See `tf.gfile.Glob` for pattern rules. 219 batch_size: An int or scalar `Tensor` specifying the batch size to use. 220 reader: A function or class that returns an object with 221 `read` method, (filename tensor) -> (example tensor). 222 randomize_input: Whether the input should be randomized. 223 num_epochs: Integer specifying the number of times to read through the 224 dataset. If `None`, cycles through the dataset forever. 225 NOTE - If specified, creates a variable that must be initialized, so call 226 `tf.local_variables_initializer()` and run the op in a session. 227 queue_capacity: Capacity for input queue. 228 num_threads: The number of threads enqueuing examples. 229 read_batch_size: An int or scalar `Tensor` specifying the number of 230 records to read at once. 231 parse_fn: Parsing function, takes `Example` Tensor returns parsed 232 representation. If `None`, no parsing is done. 233 name: Name of resulting op. 234 seed: An integer (optional). Seed used if randomize_input == True. 235 236 Returns: 237 Returns tuple of: 238 - `Tensor` of string keys. 239 - String `Tensor` of batched `Example` proto. 240 241 Raises: 242 ValueError: for invalid inputs. 243 """ 244 return _read_keyed_batch_examples_helper( 245 file_pattern, 246 batch_size, 247 reader, 248 randomize_input=randomize_input, 249 num_epochs=num_epochs, 250 queue_capacity=queue_capacity, 251 num_threads=num_threads, 252 read_batch_size=read_batch_size, 253 parse_fn=parse_fn, 254 setup_shared_queue=True, 255 name=name, 256 seed=seed) 257 258 259def _get_file_names(file_pattern, randomize_input): 260 """Parse list of file names from pattern, optionally shuffled. 261 262 Args: 263 file_pattern: File glob pattern, or list of glob patterns. 264 randomize_input: Whether to shuffle the order of file names. 265 266 Returns: 267 List of file names matching `file_pattern`. 268 269 Raises: 270 ValueError: If `file_pattern` is empty, or pattern matches no files. 271 """ 272 if isinstance(file_pattern, list): 273 if not file_pattern: 274 raise ValueError('No files given to dequeue_examples.') 275 file_names = [] 276 for entry in file_pattern: 277 file_names.extend(gfile.Glob(entry)) 278 else: 279 file_names = list(gfile.Glob(file_pattern)) 280 281 if not file_names: 282 raise ValueError('No files match %s.' % file_pattern) 283 284 # Sort files so it will be deterministic for unit tests. They'll be shuffled 285 # in `string_input_producer` if `randomize_input` is enabled. 286 if not randomize_input: 287 file_names = sorted(file_names) 288 return file_names 289 290 291def _get_examples(file_name_queue, reader, num_threads, read_batch_size, 292 filter_fn, parse_fn): 293 """Get example filenames matching. 294 295 Args: 296 file_name_queue: A queue implementation that dequeues elements in 297 first-in first-out order. 298 reader: A function or class that returns an object with 299 `read` method, (filename tensor) -> (example tensor). 300 num_threads: The number of threads enqueuing examples. 301 read_batch_size: An int or scalar `Tensor` specifying the number of 302 records to read at once. 303 filter_fn: Filtering function, takes both keys as well as an `Example` 304 Tensors and returns a boolean mask of the same shape as the input Tensors 305 to be applied for filtering. If `None`, no filtering is done. 306 parse_fn: Parsing function, takes `Example` Tensor returns parsed 307 representation. If `None`, no parsing is done. 308 309 Returns: 310 List of example file names matching `file_name_queue`. 311 """ 312 with ops.name_scope('read'): 313 example_list = [] 314 for _ in range(num_threads): 315 keys, examples_proto = utils.smart_cond( 316 read_batch_size > 1, 317 lambda: reader().read_up_to(file_name_queue, read_batch_size), 318 lambda: reader().read(file_name_queue)) 319 320 if filter_fn: 321 mask = filter_fn(keys, examples_proto) 322 keys = array_ops.boolean_mask(keys, mask) 323 examples_proto = array_ops.boolean_mask(examples_proto, mask) 324 if parse_fn: 325 parsed_examples = parse_fn(examples_proto) 326 # Map keys into example map because batch_join doesn't support 327 # tuple of Tensor + dict. 328 if isinstance(parsed_examples, dict): 329 parsed_examples[KEY_FEATURE_NAME] = keys 330 example_list.append(parsed_examples) 331 else: 332 example_list.append((keys, parsed_examples)) 333 else: 334 example_list.append((keys, examples_proto)) 335 return example_list 336 337 338def _read_keyed_batch_examples_helper(file_pattern, 339 batch_size, 340 reader, 341 randomize_input=True, 342 num_epochs=None, 343 queue_capacity=10000, 344 num_threads=1, 345 read_batch_size=1, 346 filter_fn=None, 347 parse_fn=None, 348 setup_shared_queue=False, 349 name=None, 350 seed=None): 351 """Adds operations to read, queue, batch `Example` protos. 352 353 Args: 354 file_pattern: List of files or patterns of file paths containing 355 `Example` records. See `tf.gfile.Glob` for pattern rules. 356 batch_size: An int or scalar `Tensor` specifying the batch size to use. 357 reader: A function or class that returns an object with 358 `read` method, (filename tensor) -> (example tensor). 359 randomize_input: Whether the input should be randomized. 360 num_epochs: Integer specifying the number of times to read through the 361 dataset. If `None`, cycles through the dataset forever. 362 NOTE - If specified, creates a variable that must be initialized, so call 363 `tf.local_variables_initializer()` and run the op in a session. 364 queue_capacity: Capacity for input queue. 365 num_threads: The number of threads enqueuing examples. 366 read_batch_size: An int or scalar `Tensor` specifying the number of 367 records to read at once. 368 filter_fn: Filtering function, takes both keys as well `Example` Tensors 369 and returns a boolean mask of the same shape as the input Tensors to 370 be applied for filtering. If `None`, no filtering is done. 371 parse_fn: Parsing function, takes `Example` Tensor returns parsed 372 representation. If `None`, no parsing is done. 373 setup_shared_queue: Whether to set up a shared queue for file names. 374 name: Name of resulting op. 375 seed: An integer (optional). Seed used if randomize_input == True. 376 377 Returns: 378 Returns tuple of: 379 - `Tensor` of string keys. 380 - String `Tensor` of batched `Example` proto. 381 382 Raises: 383 ValueError: for invalid inputs. 384 """ 385 # Retrieve files to read. 386 file_names = _get_file_names(file_pattern, randomize_input) 387 388 # Check input parameters are given and reasonable. 389 if (not queue_capacity) or (queue_capacity <= 0): 390 raise ValueError('Invalid queue_capacity %s.' % queue_capacity) 391 if (batch_size is None) or ( 392 (not isinstance(batch_size, ops.Tensor)) and 393 (batch_size <= 0 or batch_size >= queue_capacity)): 394 raise ValueError('Invalid batch_size %s, with queue_capacity %s.' % 395 (batch_size, queue_capacity)) 396 if (read_batch_size is None) or ( 397 (not isinstance(read_batch_size, ops.Tensor)) and (read_batch_size <= 0)): 398 raise ValueError('Invalid read_batch_size %s.' % read_batch_size) 399 if (not num_threads) or (num_threads <= 0): 400 raise ValueError('Invalid num_threads %s.' % num_threads) 401 if (num_epochs is not None) and (num_epochs <= 0): 402 raise ValueError('Invalid num_epochs %s.' % num_epochs) 403 404 with ops.name_scope(name, 'read_batch_examples', [file_pattern]) as scope: 405 with ops.name_scope('file_name_queue') as file_name_queue_scope: 406 if setup_shared_queue: 407 file_name_queue = data_flow_ops.FIFOQueue( 408 capacity=1, dtypes=[dtypes.string], shapes=[[]]) 409 enqueue_op = file_name_queue.enqueue( 410 input_pipeline_ops.seek_next( 411 file_names, 412 shuffle=randomize_input, 413 num_epochs=num_epochs, 414 seed=seed)) 415 queue_runner.add_queue_runner( 416 queue_runner.QueueRunner(file_name_queue, [enqueue_op])) 417 else: 418 file_name_queue = input_ops.string_input_producer( 419 constant_op.constant(file_names, name='input'), 420 shuffle=randomize_input, 421 num_epochs=num_epochs, 422 name=file_name_queue_scope, 423 seed=seed) 424 425 example_list = _get_examples(file_name_queue, reader, num_threads, 426 read_batch_size, filter_fn, parse_fn) 427 428 enqueue_many = read_batch_size > 1 429 430 if num_epochs is None: 431 allow_smaller_final_batch = False 432 else: 433 allow_smaller_final_batch = True 434 435 # Setup batching queue given list of read example tensors. 436 if randomize_input: 437 if isinstance(batch_size, ops.Tensor): 438 min_after_dequeue = int(queue_capacity * 0.4) 439 else: 440 min_after_dequeue = max(queue_capacity - (3 * batch_size), batch_size) 441 queued_examples_with_keys = input_ops.shuffle_batch_join( 442 example_list, 443 batch_size, 444 capacity=queue_capacity, 445 min_after_dequeue=min_after_dequeue, 446 enqueue_many=enqueue_many, 447 name=scope, 448 allow_smaller_final_batch=allow_smaller_final_batch, 449 seed=seed) 450 else: 451 queued_examples_with_keys = input_ops.batch_join( 452 example_list, 453 batch_size, 454 capacity=queue_capacity, 455 enqueue_many=enqueue_many, 456 name=scope, 457 allow_smaller_final_batch=allow_smaller_final_batch) 458 if parse_fn and isinstance(queued_examples_with_keys, dict): 459 queued_keys = queued_examples_with_keys.pop(KEY_FEATURE_NAME) 460 return queued_keys, queued_examples_with_keys 461 return queued_examples_with_keys 462 463 464@deprecated(None, 'Use tf.data.') 465def read_keyed_batch_features(file_pattern, 466 batch_size, 467 features, 468 reader, 469 randomize_input=True, 470 num_epochs=None, 471 queue_capacity=10000, 472 reader_num_threads=1, 473 feature_queue_capacity=100, 474 num_enqueue_threads=2, 475 parse_fn=None, 476 name=None, 477 read_batch_size=None): 478 """Adds operations to read, queue, batch and parse `Example` protos. 479 480 Given file pattern (or list of files), will setup a queue for file names, 481 read `Example` proto using provided `reader`, use batch queue to create 482 batches of examples of size `batch_size` and parse example given `features` 483 specification. 484 485 All queue runners are added to the queue runners collection, and may be 486 started via `start_queue_runners`. 487 488 All ops are added to the default graph. 489 490 Args: 491 file_pattern: List of files or patterns of file paths containing 492 `Example` records. See `tf.gfile.Glob` for pattern rules. 493 batch_size: An int or scalar `Tensor` specifying the batch size to use. 494 features: A `dict` mapping feature keys to `FixedLenFeature` or 495 `VarLenFeature` values. 496 reader: A function or class that returns an object with 497 `read` method, (filename tensor) -> (example tensor). 498 randomize_input: Whether the input should be randomized. 499 num_epochs: Integer specifying the number of times to read through the 500 dataset. If None, cycles through the dataset forever. NOTE - If specified, 501 creates a variable that must be initialized, so call 502 tf.local_variables_initializer() and run the op in a session. 503 queue_capacity: Capacity for input queue. 504 reader_num_threads: The number of threads to read examples. In order to have 505 predictable and repeatable order of reading and enqueueing, such as in 506 prediction and evaluation mode, `reader_num_threads` should be 1. 507 feature_queue_capacity: Capacity of the parsed features queue. 508 num_enqueue_threads: Number of threads to enqueue the parsed example queue. 509 Using multiple threads to enqueue the parsed example queue helps maintain 510 a full queue when the subsequent computations overall are cheaper than 511 parsing. In order to have predictable and repeatable order of reading and 512 enqueueing, such as in prediction and evaluation mode, 513 `num_enqueue_threads` should be 1. 514 parse_fn: Parsing function, takes `Example` Tensor returns parsed 515 representation. If `None`, no parsing is done. 516 name: Name of resulting op. 517 read_batch_size: An int or scalar `Tensor` specifying the number of 518 records to read at once. If `None`, defaults to `batch_size`. 519 520 Returns: 521 Returns tuple of: 522 - `Tensor` of string keys. 523 - A dict of `Tensor` or `SparseTensor` objects for each in `features`. 524 525 Raises: 526 ValueError: for invalid inputs. 527 """ 528 529 with ops.name_scope(name, 'read_batch_features', [file_pattern]) as scope: 530 if read_batch_size is None: 531 read_batch_size = batch_size 532 keys, examples = read_keyed_batch_examples( 533 file_pattern, 534 batch_size, 535 reader, 536 randomize_input=randomize_input, 537 num_epochs=num_epochs, 538 queue_capacity=queue_capacity, 539 num_threads=reader_num_threads, 540 read_batch_size=read_batch_size, 541 parse_fn=parse_fn, 542 name=scope) 543 # Parse the example. 544 feature_map = parsing_ops.parse_example(examples, features) 545 return queue_parsed_features( 546 feature_map, 547 keys=keys, 548 feature_queue_capacity=feature_queue_capacity, 549 num_enqueue_threads=num_enqueue_threads, 550 name=scope) 551 552 553@deprecated(None, 'Use tf.data.') 554def read_keyed_batch_features_shared_queue(file_pattern, 555 batch_size, 556 features, 557 reader, 558 randomize_input=True, 559 num_epochs=None, 560 queue_capacity=10000, 561 reader_num_threads=1, 562 feature_queue_capacity=100, 563 num_queue_runners=2, 564 parse_fn=None, 565 name=None): 566 """Adds operations to read, queue, batch and parse `Example` protos. 567 568 Given file pattern (or list of files), will setup a shared queue for file 569 names, setup a worker queue that gets filenames from the shared queue, 570 read `Example` proto using provided `reader`, use batch queue to create 571 batches of examples of size `batch_size` and parse example given `features` 572 specification. 573 574 All queue runners are added to the queue runners collection, and may be 575 started via `start_queue_runners`. 576 577 All ops are added to the default graph. 578 579 Args: 580 file_pattern: List of files or patterns of file paths containing 581 `Example` records. See `tf.gfile.Glob` for pattern rules. 582 batch_size: An int or scalar `Tensor` specifying the batch size to use. 583 features: A `dict` mapping feature keys to `FixedLenFeature` or 584 `VarLenFeature` values. 585 reader: A function or class that returns an object with 586 `read` method, (filename tensor) -> (example tensor). 587 randomize_input: Whether the input should be randomized. 588 num_epochs: Integer specifying the number of times to read through the 589 dataset. If None, cycles through the dataset forever. NOTE - If specified, 590 creates a variable that must be initialized, so call 591 tf.local_variables_initializer() and run the op in a session. 592 queue_capacity: Capacity for input queue. 593 reader_num_threads: The number of threads to read examples. 594 feature_queue_capacity: Capacity of the parsed features queue. 595 num_queue_runners: Number of threads to enqueue the parsed example queue. 596 Using multiple threads to enqueue the parsed example queue helps maintain 597 a full queue when the subsequent computations overall are cheaper than 598 parsing. 599 parse_fn: Parsing function, takes `Example` Tensor returns parsed 600 representation. If `None`, no parsing is done. 601 name: Name of resulting op. 602 603 Returns: 604 Returns tuple of: 605 - `Tensor` of string keys. 606 - A dict of `Tensor` or `SparseTensor` objects for each in `features`. 607 608 Raises: 609 ValueError: for invalid inputs. 610 """ 611 612 with ops.name_scope(name, 'read_batch_features', [file_pattern]) as scope: 613 keys, examples = read_keyed_batch_examples_shared_queue( 614 file_pattern, 615 batch_size, 616 reader, 617 randomize_input=randomize_input, 618 num_epochs=num_epochs, 619 queue_capacity=queue_capacity, 620 num_threads=reader_num_threads, 621 read_batch_size=batch_size, 622 parse_fn=parse_fn, 623 name=scope) 624 # Parse the example. 625 feature_map = parsing_ops.parse_example(examples, features) 626 return queue_parsed_features( 627 feature_map, 628 keys=keys, 629 feature_queue_capacity=feature_queue_capacity, 630 num_enqueue_threads=num_queue_runners, 631 name=scope) 632 633 634@deprecated(None, 'Use tf.data.') 635def queue_parsed_features(parsed_features, 636 keys=None, 637 feature_queue_capacity=100, 638 num_enqueue_threads=2, 639 name=None): 640 """Speeds up parsing by using queues to do it asynchronously. 641 642 This function adds the tensors in `parsed_features` to a queue, which allows 643 the parsing (or any other expensive op before this) to be asynchronous wrt the 644 rest of the training graph. This greatly improves read latency and speeds up 645 training since the data will already be parsed and ready when each step of 646 training needs it. 647 648 All queue runners are added to the queue runners collection, and may be 649 started via `start_queue_runners`. 650 651 All ops are added to the default graph. 652 653 Args: 654 parsed_features: A dict of string key to `Tensor` or `SparseTensor` objects. 655 keys: `Tensor` of string keys. 656 feature_queue_capacity: Capacity of the parsed features queue. 657 num_enqueue_threads: Number of threads to enqueue the parsed example queue. 658 Using multiple threads to enqueue the parsed example queue helps maintain 659 a full queue when the subsequent computations overall are cheaper than 660 parsing. In order to have predictable and repeatable order of reading and 661 enqueueing, such as in prediction and evaluation mode, 662 `num_enqueue_threads` should be 1. 663 name: Name of resulting op. 664 665 Returns: 666 Returns tuple of: 667 - `Tensor` corresponding to `keys` if provided, otherwise `None`. 668 - A dict of string key to `Tensor` or `SparseTensor` objects corresponding 669 to `parsed_features`. 670 Raises: 671 ValueError: for invalid inputs. 672 """ 673 674 args = list(parsed_features.values()) 675 if keys is not None: 676 args += [keys] 677 678 with ops.name_scope(name, 'queue_parsed_features', args): 679 # Lets also add preprocessed tensors into the queue types for each item of 680 # the queue. 681 tensors_to_enqueue = [] 682 # Each entry contains the key, and a boolean which indicates whether the 683 # tensor was a sparse tensor. 684 tensors_mapping = [] 685 # TODO(sibyl-Aix6ihai): Most of the functionality here is about pushing sparse 686 # tensors into a queue. This could be taken care in somewhere else so others 687 # can reuse it. Also, QueueBase maybe extended to handle sparse tensors 688 # directly. 689 for key in sorted(parsed_features.keys()): 690 tensor = parsed_features[key] 691 if isinstance(tensor, sparse_tensor.SparseTensor): 692 tensors_mapping.append((key, True)) 693 tensors_to_enqueue.extend( 694 [tensor.indices, tensor.values, tensor.dense_shape]) 695 else: 696 tensors_mapping.append((key, False)) 697 tensors_to_enqueue.append(tensor) 698 699 if keys is not None: 700 tensors_to_enqueue.append(keys) 701 702 queue_dtypes = [x.dtype for x in tensors_to_enqueue] 703 input_queue = data_flow_ops.FIFOQueue(feature_queue_capacity, queue_dtypes) 704 705 # Add a summary op to debug if our feature queue is full or not. 706 summary.scalar('queue/parsed_features/%s/fraction_of_%d_full' % 707 (input_queue.name, feature_queue_capacity), 708 math_ops.cast(input_queue.size(), dtypes.float32) * 709 (1. / feature_queue_capacity)) 710 711 # Use a single QueueRunner with multiple threads to enqueue so the queue is 712 # always full. The threads are coordinated so the last batch will not be 713 # lost. 714 enqueue_ops = [ 715 input_queue.enqueue(tensors_to_enqueue) 716 for _ in range(num_enqueue_threads) 717 ] 718 queue_runner.add_queue_runner( 719 queue_runner.QueueRunner( 720 input_queue, 721 enqueue_ops, 722 queue_closed_exception_types=(errors.OutOfRangeError, 723 errors.CancelledError))) 724 725 dequeued_tensors = input_queue.dequeue() 726 if not isinstance(dequeued_tensors, list): 727 # input_queue.dequeue() returns a single tensor instead of a list of 728 # tensors if there is only one tensor to dequeue, which breaks the 729 # assumption of a list below. 730 dequeued_tensors = [dequeued_tensors] 731 732 # Reset shapes on dequeued tensors. 733 for i in range(len(tensors_to_enqueue)): 734 dequeued_tensors[i].set_shape(tensors_to_enqueue[i].get_shape()) 735 736 # Recreate feature mapping according to the original dictionary. 737 dequeued_parsed_features = {} 738 index = 0 739 for key, is_sparse_tensor in tensors_mapping: 740 if is_sparse_tensor: 741 # Three tensors are (indices, values, shape). 742 dequeued_parsed_features[key] = sparse_tensor.SparseTensor( 743 dequeued_tensors[index], dequeued_tensors[index + 1], 744 dequeued_tensors[index + 2]) 745 index += 3 746 else: 747 dequeued_parsed_features[key] = dequeued_tensors[index] 748 index += 1 749 750 dequeued_keys = None 751 if keys is not None: 752 dequeued_keys = dequeued_tensors[-1] 753 754 return dequeued_keys, dequeued_parsed_features 755 756 757@deprecated(None, 'Use tf.data.') 758def read_batch_features(file_pattern, 759 batch_size, 760 features, 761 reader, 762 randomize_input=True, 763 num_epochs=None, 764 queue_capacity=10000, 765 feature_queue_capacity=100, 766 reader_num_threads=1, 767 num_enqueue_threads=2, 768 parse_fn=None, 769 name=None, 770 read_batch_size=None): 771 """Adds operations to read, queue, batch and parse `Example` protos. 772 773 Given file pattern (or list of files), will setup a queue for file names, 774 read `Example` proto using provided `reader`, use batch queue to create 775 batches of examples of size `batch_size` and parse example given `features` 776 specification. 777 778 All queue runners are added to the queue runners collection, and may be 779 started via `start_queue_runners`. 780 781 All ops are added to the default graph. 782 783 Args: 784 file_pattern: List of files or patterns of file paths containing 785 `Example` records. See `tf.gfile.Glob` for pattern rules. 786 batch_size: An int or scalar `Tensor` specifying the batch size to use. 787 features: A `dict` mapping feature keys to `FixedLenFeature` or 788 `VarLenFeature` values. 789 reader: A function or class that returns an object with 790 `read` method, (filename tensor) -> (example tensor). 791 randomize_input: Whether the input should be randomized. 792 num_epochs: Integer specifying the number of times to read through the 793 dataset. If None, cycles through the dataset forever. NOTE - If specified, 794 creates a variable that must be initialized, so call 795 tf.local_variables_initializer() and run the op in a session. 796 queue_capacity: Capacity for input queue. 797 feature_queue_capacity: Capacity of the parsed features queue. Set this 798 value to a small number, for example 5 if the parsed features are large. 799 reader_num_threads: The number of threads to read examples. In order to have 800 predictable and repeatable order of reading and enqueueing, such as in 801 prediction and evaluation mode, `reader_num_threads` should be 1. 802 num_enqueue_threads: Number of threads to enqueue the parsed example queue. 803 Using multiple threads to enqueue the parsed example queue helps maintain 804 a full queue when the subsequent computations overall are cheaper than 805 parsing. In order to have predictable and repeatable order of reading and 806 enqueueing, such as in prediction and evaluation mode, 807 `num_enqueue_threads` should be 1. 808 parse_fn: Parsing function, takes `Example` Tensor returns parsed 809 representation. If `None`, no parsing is done. 810 name: Name of resulting op. 811 read_batch_size: An int or scalar `Tensor` specifying the number of 812 records to read at once. If `None`, defaults to `batch_size`. 813 814 Returns: 815 A dict of `Tensor` or `SparseTensor` objects for each in `features`. 816 817 Raises: 818 ValueError: for invalid inputs. 819 """ 820 _, features = read_keyed_batch_features( 821 file_pattern, 822 batch_size, 823 features, 824 reader, 825 randomize_input=randomize_input, 826 num_epochs=num_epochs, 827 queue_capacity=queue_capacity, 828 reader_num_threads=reader_num_threads, 829 feature_queue_capacity=feature_queue_capacity, 830 num_enqueue_threads=num_enqueue_threads, 831 read_batch_size=read_batch_size, 832 parse_fn=parse_fn, 833 name=name) 834 return features 835 836 837@deprecated(None, 'Use tf.data.') 838def read_batch_record_features(file_pattern, 839 batch_size, 840 features, 841 randomize_input=True, 842 num_epochs=None, 843 queue_capacity=10000, 844 reader_num_threads=1, 845 name='dequeue_record_examples'): 846 """Reads TFRecord, queues, batches and parses `Example` proto. 847 848 See more detailed description in `read_examples`. 849 850 Args: 851 file_pattern: List of files or patterns of file paths containing 852 `Example` records. See `tf.gfile.Glob` for pattern rules. 853 batch_size: An int or scalar `Tensor` specifying the batch size to use. 854 features: A `dict` mapping feature keys to `FixedLenFeature` or 855 `VarLenFeature` values. 856 randomize_input: Whether the input should be randomized. 857 num_epochs: Integer specifying the number of times to read through the 858 dataset. If None, cycles through the dataset forever. NOTE - If specified, 859 creates a variable that must be initialized, so call 860 tf.local_variables_initializer() and run the op in a session. 861 queue_capacity: Capacity for input queue. 862 reader_num_threads: The number of threads to read examples. In order to have 863 predictable and repeatable order of reading and enqueueing, such as in 864 prediction and evaluation mode, `reader_num_threads` should be 1. 865 name: Name of resulting op. 866 867 Returns: 868 A dict of `Tensor` or `SparseTensor` objects for each in `features`. 869 870 Raises: 871 ValueError: for invalid inputs. 872 """ 873 return read_batch_features( 874 file_pattern=file_pattern, 875 batch_size=batch_size, 876 features=features, 877 reader=io_ops.TFRecordReader, 878 randomize_input=randomize_input, 879 num_epochs=num_epochs, 880 queue_capacity=queue_capacity, 881 reader_num_threads=reader_num_threads, 882 name=name) 883