• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/framework/dataset.h"
16 #include "tensorflow/core/framework/partial_tensor_shape.h"
17 #include "tensorflow/core/framework/tensor.h"
18 #include "tensorflow/core/framework/variant.h"
19 
20 namespace tensorflow {
21 namespace data {
22 namespace experimental {
23 namespace {
24 
25 class DenseToSparseBatchDatasetOp : public UnaryDatasetOpKernel {
26  public:
DenseToSparseBatchDatasetOp(OpKernelConstruction * ctx)27   explicit DenseToSparseBatchDatasetOp(OpKernelConstruction* ctx)
28       : UnaryDatasetOpKernel(ctx) {}
29 
MakeDataset(OpKernelContext * ctx,DatasetBase * input,DatasetBase ** output)30   void MakeDataset(OpKernelContext* ctx, DatasetBase* input,
31                    DatasetBase** output) override {
32     // Create a new DenseToSparseBatchDatasetOp::Dataset, insert it in the
33     // step-local container, and return it as the output.
34     OP_REQUIRES(
35         ctx, input->output_dtypes().size() == 1,
36         errors::InvalidArgument("DenseToSparseBatchDataset only supports "
37                                 "inputs with a single component."));
38 
39     int64_t batch_size;
40     OP_REQUIRES_OK(
41         ctx, ParseScalarArgument<int64_t>(ctx, "batch_size", &batch_size));
42     OP_REQUIRES(
43         ctx, batch_size > 0,
44         errors::InvalidArgument("Batch size must be greater than zero."));
45 
46     const Tensor* row_shape_t;
47     OP_REQUIRES_OK(ctx, ctx->input("row_shape", &row_shape_t));
48     OP_REQUIRES(ctx, TensorShapeUtils::IsVector(row_shape_t->shape()),
49                 errors::InvalidArgument("row_shape must be a vector"));
50     PartialTensorShape row_shape;
51     OP_REQUIRES_OK(ctx, PartialTensorShape::MakePartialShape(
52                             row_shape_t->vec<int64_t>().data(),
53                             row_shape_t->NumElements(), &row_shape));
54 
55     *output = nullptr;
56 
57 #define HANDLE_TYPE(T)                                           \
58   case DataTypeToEnum<T>::value: {                               \
59     *output = new Dataset<T>(ctx, batch_size, row_shape, input); \
60     break;                                                       \
61   }
62 
63     switch (input->output_dtypes()[0]) {
64       TF_CALL_DATASET_TYPES(HANDLE_TYPE);
65 #undef HANDLE_TYPE
66       default:
67         OP_REQUIRES(ctx, false,
68                     errors::Unimplemented(
69                         "DenseToSparseBatchDataset unhandled data type: ",
70                         input->output_dtypes()[0]));
71     }
72   }
73 
74  private:
75   // TODO(mrry): Push the templated code down to the raw copying routine.
76   template <class T>
77   class Dataset : public DatasetBase {
78    public:
Dataset(OpKernelContext * ctx,int64_t batch_size,const PartialTensorShape & row_shape,const DatasetBase * input)79     Dataset(OpKernelContext* ctx, int64_t batch_size,
80             const PartialTensorShape& row_shape, const DatasetBase* input)
81         : DatasetBase(DatasetContext(ctx)),
82           batch_size_(batch_size),
83           row_shape_(row_shape),
84           input_(input) {
85       input_->Ref();
86 
87       output_shapes_.reserve(1);
88       PartialTensorShape output_shape({-1});
89       output_shape.AppendShape(row_shape_);
90       output_shapes_.push_back(output_shape);
91     }
92 
~Dataset()93     ~Dataset() override { input_->Unref(); }
94 
MakeIteratorInternal(const string & prefix) const95     std::unique_ptr<IteratorBase> MakeIteratorInternal(
96         const string& prefix) const override {
97       return std::make_unique<Iterator>(typename Iterator::Params{
98           this, strings::StrCat(prefix, "::DenseToSparseBatch")});
99     }
100 
output_dtypes() const101     const DataTypeVector& output_dtypes() const override {
102       static DataTypeVector* output_dtypes = new DataTypeVector({DT_VARIANT});
103       return *output_dtypes;
104     }
105 
output_shapes() const106     const std::vector<PartialTensorShape>& output_shapes() const override {
107       return output_shapes_;
108     }
109 
DebugString() const110     string DebugString() const override {
111       return strings::StrCat("DenseToSparseBatchDatasetOp(", batch_size_,
112                              ")::Dataset");
113     }
114 
CardinalityInternal() const115     int64_t CardinalityInternal() const override {
116       int64_t n = input_->Cardinality();
117       if (n == kInfiniteCardinality || n == kUnknownCardinality) {
118         return n;
119       }
120       return n / batch_size_ + (n % batch_size_ == 0 ? 0 : 1);
121     }
122 
InputDatasets(std::vector<const DatasetBase * > * inputs) const123     Status InputDatasets(
124         std::vector<const DatasetBase*>* inputs) const override {
125       inputs->push_back(input_);
126       return OkStatus();
127     }
128 
CheckExternalState() const129     Status CheckExternalState() const override {
130       return input_->CheckExternalState();
131     }
132 
133    protected:
AsGraphDefInternal(SerializationContext * ctx,DatasetGraphDefBuilder * b,Node ** output) const134     Status AsGraphDefInternal(SerializationContext* ctx,
135                               DatasetGraphDefBuilder* b,
136                               Node** output) const override {
137       Node* input_node;
138       TF_RETURN_IF_ERROR(b->AddInputDataset(ctx, input_, &input_node));
139       Node* batch_size_node;
140       TF_RETURN_IF_ERROR(b->AddScalar(batch_size_, &batch_size_node));
141       Node* row_shape_node;
142       std::vector<int64_t> row_shape;
143       row_shape.reserve(
144           row_shape_.dims());  // not an unknown rank PartialTensorShape
145       for (int i = 0; i < row_shape_.dims(); i++)
146         row_shape.emplace_back(row_shape_.dim_size(i));
147       TF_RETURN_IF_ERROR(b->AddVector(row_shape, &row_shape_node));
148       TF_RETURN_IF_ERROR(b->AddDataset(
149           this, {input_node, batch_size_node, row_shape_node}, output));
150       return OkStatus();
151     }
152 
153    private:
154     class Iterator : public DatasetIterator<Dataset<T>> {
155      public:
Iterator(const typename Iterator::Params & params)156       explicit Iterator(const typename Iterator::Params& params)
157           : DatasetIterator<Dataset<T>>(params) {}
158 
Initialize(IteratorContext * ctx)159       Status Initialize(IteratorContext* ctx) override {
160         return DatasetIterator<Dataset<T>>::dataset()->input_->MakeIterator(
161             ctx, this, DatasetIterator<Dataset<T>>::prefix(), &input_impl_);
162       }
163 
GetNextInternal(IteratorContext * ctx,std::vector<Tensor> * out_tensors,bool * end_of_sequence)164       Status GetNextInternal(IteratorContext* ctx,
165                              std::vector<Tensor>* out_tensors,
166                              bool* end_of_sequence) override {
167         // Each row of the output SparseTensor is an individual tensor
168         // from the input iterator.
169         std::vector<Tensor> batch_elements;
170         int64_t total_elements = 0;
171         batch_elements.reserve(
172             DatasetIterator<Dataset<T>>::dataset()->batch_size_);
173         const PartialTensorShape& row_shape =
174             DatasetIterator<Dataset<T>>::dataset()->row_shape_;
175         const int row_ndims = row_shape.dims();
176 
177         // Determine the size of the output tensors:
178         // * dense_shape will be [`row_shape + 1`].
179         Tensor dense_shape(ctx->allocator({}), DT_INT64, {row_ndims + 1});
180         auto dense_shape_vec = dense_shape.vec<int64_t>();
181         for (size_t i = 0; i < row_ndims; ++i) {
182           if (row_shape.dim_size(i) == -1) {
183             dense_shape_vec(i + 1) = 0;
184           } else {
185             dense_shape_vec(i + 1) = row_shape.dim_size(i);
186           }
187         }
188 
189         {
190           mutex_lock l(mu_);
191           *end_of_sequence = false;
192           for (int i = 0;
193                i < DatasetIterator<Dataset<T>>::dataset()->batch_size_ &&
194                !*end_of_sequence;
195                ++i) {
196             std::vector<Tensor> batch_element_tuple;
197             TF_RETURN_IF_ERROR(input_impl_->GetNext(ctx, &batch_element_tuple,
198                                                     end_of_sequence));
199             if (!*end_of_sequence) {
200               DCHECK_EQ(1, batch_element_tuple.size());
201               batch_elements.push_back(std::move(batch_element_tuple[0]));
202               total_elements += batch_element_tuple[0].NumElements();
203 
204               // TODO(mrry): Investigate how to hoist this check when we
205               // have static information that renders it unnecessary.
206               if (batch_element_tuple[0].shape().dims() != row_ndims) {
207                 return errors::InvalidArgument(
208                     "Input element had shape (",
209                     batch_element_tuple[0].shape().DebugString(),
210                     ") that is incompatible with the row shape (",
211                     row_shape.DebugString(), ").");
212               }
213               for (int j = 0; j < row_ndims; ++j) {
214                 // Take the maximum in the dimension if -1 is given.
215                 if (row_shape.dim_size(j) == -1) {
216                   dense_shape_vec(j + 1) =
217                       std::max(batch_element_tuple[0].dim_size(j),
218                                dense_shape_vec(j + 1));
219                 } else if (batch_element_tuple[0].dim_size(j) >
220                            row_shape.dim_size(j)) {
221                   return errors::DataLoss(
222                       "Input element had shape (",
223                       batch_element_tuple[0].shape().DebugString(),
224                       ") that is larger than the row shape (",
225                       row_shape.DebugString(), ").");
226                 }
227               }
228             }
229           }
230         }
231 
232         if (batch_elements.empty()) {
233           DCHECK(*end_of_sequence);
234           return OkStatus();
235         }
236 
237         // * indices will be [`total_elements`, `row_shape + 1`].
238         // * values will be [`total_elements`].
239         Tensor indices(ctx->allocator({}), DT_INT64,
240                        {total_elements, row_ndims + 1});
241         Tensor values(
242             ctx->allocator({}),
243             DatasetIterator<Dataset<T>>::dataset()->input_->output_dtypes()[0],
244             {total_elements});
245         auto indices_matrix = indices.matrix<int64_t>();
246         auto values_flat = values.flat<T>();
247 
248         int64_t current_position_in_values = 0;
249         for (int64_t i = 0; i < batch_elements.size(); ++i) {
250           const Tensor& t = batch_elements[i];
251           const auto& t_flat = t.flat<T>();
252           // TODO(mrry): Replace with a memcpy or something more
253           // efficient. (Maybe an Eigen assign op?)
254           gtl::InlinedVector<int64_t, 4> strides(row_ndims);
255           if (!strides.empty()) {
256             strides[row_ndims - 1] = 1;
257             for (int64_t row_dim = strides.size() - 2; row_dim >= 0;
258                  --row_dim) {
259               strides[row_dim] =
260                   strides[row_dim + 1] * t.shape().dim_size(row_dim + 1);
261             }
262           }
263 
264           for (int64_t j = 0; j < t.NumElements(); ++j) {
265             values_flat(current_position_in_values) = t_flat(j);
266             indices_matrix(current_position_in_values, 0) = i;
267             int64_t index = j;
268             for (size_t k = 0; k < strides.size(); ++k) {
269               indices_matrix(current_position_in_values, k + 1) =
270                   index / strides[k];
271               index %= strides[k];
272             }
273             ++current_position_in_values;
274           }
275         }
276 
277         dense_shape_vec(0) = batch_elements.size();
278 
279         Tensor serialized_sparse(DT_VARIANT, TensorShape({3}));
280         auto serialized_sparse_t = serialized_sparse.vec<Variant>();
281         serialized_sparse_t(0) = std::move(indices);
282         serialized_sparse_t(1) = std::move(values);
283         serialized_sparse_t(2) = std::move(dense_shape);
284         out_tensors->push_back(std::move(serialized_sparse));
285 
286         *end_of_sequence = false;
287         return OkStatus();
288       }
289 
290      protected:
CreateNode(IteratorContext * ctx,model::Node::Args args) const291       std::shared_ptr<model::Node> CreateNode(
292           IteratorContext* ctx, model::Node::Args args) const override {
293         return model::MakeKnownRatioNode(
294             std::move(args),
295             DatasetIterator<Dataset<T>>::dataset()->batch_size_);
296       }
297 
SaveInternal(SerializationContext * ctx,IteratorStateWriter * writer)298       Status SaveInternal(SerializationContext* ctx,
299                           IteratorStateWriter* writer) override {
300         mutex_lock l(mu_);
301         TF_RETURN_IF_ERROR(Iterator::SaveInput(ctx, writer, input_impl_));
302         return OkStatus();
303       }
304 
RestoreInternal(IteratorContext * ctx,IteratorStateReader * reader)305       Status RestoreInternal(IteratorContext* ctx,
306                              IteratorStateReader* reader) override {
307         mutex_lock l(mu_);
308         TF_RETURN_IF_ERROR(Iterator::RestoreInput(ctx, reader, input_impl_));
309         return OkStatus();
310       }
311 
312      private:
313       mutex mu_;
314       std::unique_ptr<IteratorBase> input_impl_ TF_GUARDED_BY(mu_);
315     };
316 
317     const int64_t batch_size_;
318     const PartialTensorShape row_shape_;
319     const DatasetBase* const input_;
320     std::vector<PartialTensorShape> output_shapes_;
321   };
322 };
323 
324 REGISTER_KERNEL_BUILDER(Name("DenseToSparseBatchDataset").Device(DEVICE_CPU),
325                         DenseToSparseBatchDatasetOp);
326 REGISTER_KERNEL_BUILDER(
327     Name("ExperimentalDenseToSparseBatchDataset").Device(DEVICE_CPU),
328     DenseToSparseBatchDatasetOp);
329 
330 }  // namespace
331 }  // namespace experimental
332 }  // namespace data
333 }  // namespace tensorflow
334