1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Input-pipeline utilities for Distribution strategies.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21from tensorflow.python.data.ops import dataset_ops 22from tensorflow.python.data.util import traverse 23from tensorflow.python.framework import op_def_registry 24from tensorflow.python.framework import ops 25from tensorflow.python.platform import tf_logging 26 27 28# TODO(priyag): Any other reader datasets to consider here? 29_READER_DATASET_OPS = [ 30 "TextLineDataset", "TFRecordDataset", "FixedLengthRecordDataset", 31 "FixedLengthRecordDatasetV2" 32] 33 34 35# pylint: disable=protected-access 36def auto_shard_dataset(dataset, num_shards, index): 37 """Shard the input pipeline by sharding the underlying list of files. 38 39 Args: 40 dataset: A `tf.data.Dataset` instance, typically the result of a bunch of 41 dataset transformations. 42 num_shards: A `tf.int64` scalar `tf.Tensor`, representing the number of 43 shards operating in parallel. Same usage as in `tf.data.Dataset.shard`. 44 index: A `tf.int64` scalar `tf.Tensor`, representing the worker index. 45 Same usage as in `tf.data.Dataset.shard`. 46 47 Returns: 48 A modified `Dataset` obtained by updating the pipeline sharded by the 49 files. The input dataset will be returned if we cannot automatically 50 determine a good way to shard the input dataset. 51 """ 52 53 # TODO(rohanj): b/120673685 to track re-enabling auto sharding. 54 tf_logging.warn("Autosharding is currently disabled. Please shard your input " 55 "manually.") 56 del num_shards, index 57 return dataset 58 59 60def _clone_dataset(dataset): 61 """Returns a cloned version of `dataset`.""" 62 variant_tensor_ops = traverse.obtain_all_variant_tensor_ops(dataset) 63 remap_dict = _clone_helper(dataset._variant_tensor.op, variant_tensor_ops) 64 new_variant_tensor = remap_dict[dataset._variant_tensor.op].outputs[0] 65 return dataset_ops._VariantDataset(new_variant_tensor, 66 dataset._element_structure) 67 68 69def _get_op_def(op): 70 return op.op_def or op_def_registry.get_registered_ops()[op.type] 71 72 73def _clone_helper(op_to_clone, variant_tensor_ops): 74 """Helper method that recursively clones `op_to_clone`. 75 76 Args: 77 op_to_clone: The op we want to clone. 78 variant_tensor_ops: A list of ops that we have to clone along the way. 79 80 Returns: 81 A dictionary mapping old_ops to new_ops created. Includes op_to_clone 82 as a key. 83 """ 84 remap_dict = {} 85 for input_tensor in op_to_clone.inputs: 86 input_tensor_op = input_tensor.op 87 if input_tensor_op in variant_tensor_ops: 88 recursive_map = _clone_helper(input_tensor_op, variant_tensor_ops) 89 remap_dict.update(recursive_map) 90 inputs_list = [] 91 for input_tensor in op_to_clone.inputs: 92 input_tensor_op = input_tensor.op 93 if input_tensor_op in remap_dict: 94 remapped_input = remap_dict[input_tensor_op].outputs[0] 95 inputs_list.append(remapped_input) 96 else: 97 inputs_list.append(input_tensor_op.outputs[input_tensor.value_index]) 98 g = ops.get_default_graph() 99 new_op = g.create_op( 100 op_to_clone.type, 101 inputs_list, [o.dtype for o in op_to_clone.outputs], 102 name=op_to_clone.name, 103 attrs=op_to_clone.node_def.attr, 104 op_def=_get_op_def(op_to_clone)) 105 remap_dict[op_to_clone] = new_op 106 return remap_dict 107