1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 // See docs in ../ops/math_ops.cc. 17 18 #include "tensorflow/core/platform/errors.h" 19 #define EIGEN_USE_THREADS 20 21 #include "tensorflow/core/framework/op_kernel.h" 22 #include "tensorflow/core/framework/register_types.h" 23 #include "tensorflow/core/framework/types.h" 24 #include "tensorflow/core/kernels/bincount_op.h" 25 #include "tensorflow/core/kernels/fill_functor.h" 26 #include "tensorflow/core/lib/core/threadpool.h" 27 #include "tensorflow/core/platform/types.h" 28 29 namespace tensorflow { 30 31 using thread::ThreadPool; 32 33 typedef Eigen::ThreadPoolDevice CPUDevice; 34 typedef Eigen::GpuDevice GPUDevice; 35 36 namespace functor { 37 38 template <typename Tidx, typename T> 39 struct BincountFunctor<CPUDevice, Tidx, T, true> { Computetensorflow::functor::BincountFunctor40 static Status Compute(OpKernelContext* context, 41 const typename TTypes<Tidx, 1>::ConstTensor& arr, 42 const typename TTypes<T, 1>::ConstTensor& weights, 43 typename TTypes<T, 1>::Tensor& output, 44 const Tidx num_bins) { 45 Tensor all_nonneg_t; 46 TF_RETURN_IF_ERROR(context->allocate_temp( 47 DT_BOOL, TensorShape({}), &all_nonneg_t, AllocatorAttributes())); 48 all_nonneg_t.scalar<bool>().device(context->eigen_cpu_device()) = 49 (arr >= Tidx(0)).all(); 50 if (!all_nonneg_t.scalar<bool>()()) { 51 return errors::InvalidArgument("Input arr must be non-negative!"); 52 } 53 54 // Allocate partial output bin sums for each worker thread. Worker ids in 55 // ParallelForWithWorkerId range from 0 to NumThreads() inclusive. 56 ThreadPool* thread_pool = 57 context->device()->tensorflow_cpu_worker_threads()->workers; 58 const int64 num_threads = thread_pool->NumThreads() + 1; 59 Tensor partial_bins_t; 60 TF_RETURN_IF_ERROR(context->allocate_temp( 61 DT_BOOL, TensorShape({num_threads, num_bins}), &partial_bins_t)); 62 auto partial_bins = partial_bins_t.matrix<bool>(); 63 partial_bins.setZero(); 64 thread_pool->ParallelForWithWorkerId( 65 arr.size(), 8 /* cost */, 66 [&](int64 start_ind, int64 limit_ind, int64 worker_id) { 67 for (int64 i = start_ind; i < limit_ind; i++) { 68 Tidx value = arr(i); 69 if (value < num_bins) { 70 partial_bins(worker_id, value) = true; 71 } 72 } 73 }); 74 75 // Sum the partial bins along the 0th axis. 76 Eigen::array<int, 1> reduce_dim({0}); 77 output.device(context->eigen_cpu_device()) = 78 partial_bins.any(reduce_dim).cast<T>(); 79 return Status::OK(); 80 } 81 }; 82 83 template <typename Tidx, typename T> 84 struct BincountFunctor<CPUDevice, Tidx, T, false> { Computetensorflow::functor::BincountFunctor85 static Status Compute(OpKernelContext* context, 86 const typename TTypes<Tidx, 1>::ConstTensor& arr, 87 const typename TTypes<T, 1>::ConstTensor& weights, 88 typename TTypes<T, 1>::Tensor& output, 89 const Tidx num_bins) { 90 Tensor all_nonneg_t; 91 TF_RETURN_IF_ERROR(context->allocate_temp( 92 DT_BOOL, TensorShape({}), &all_nonneg_t, AllocatorAttributes())); 93 all_nonneg_t.scalar<bool>().device(context->eigen_cpu_device()) = 94 (arr >= Tidx(0)).all(); 95 if (!all_nonneg_t.scalar<bool>()()) { 96 return errors::InvalidArgument("Input arr must be non-negative!"); 97 } 98 99 // Allocate partial output bin sums for each worker thread. Worker ids in 100 // ParallelForWithWorkerId range from 0 to NumThreads() inclusive. 101 ThreadPool* thread_pool = 102 context->device()->tensorflow_cpu_worker_threads()->workers; 103 const int64 num_threads = thread_pool->NumThreads() + 1; 104 Tensor partial_bins_t; 105 TF_RETURN_IF_ERROR(context->allocate_temp( 106 DataTypeToEnum<T>::value, TensorShape({num_threads, num_bins}), 107 &partial_bins_t)); 108 auto partial_bins = partial_bins_t.matrix<T>(); 109 partial_bins.setZero(); 110 thread_pool->ParallelForWithWorkerId( 111 arr.size(), 8 /* cost */, 112 [&](int64 start_ind, int64 limit_ind, int64 worker_id) { 113 for (int64 i = start_ind; i < limit_ind; i++) { 114 Tidx value = arr(i); 115 if (value < num_bins) { 116 if (weights.size()) { 117 partial_bins(worker_id, value) += weights(i); 118 } else { 119 // Complex numbers don't support "++". 120 partial_bins(worker_id, value) += T(1); 121 } 122 } 123 } 124 }); 125 126 // Sum the partial bins along the 0th axis. 127 Eigen::array<int, 1> reduce_dim({0}); 128 output.device(context->eigen_cpu_device()) = partial_bins.sum(reduce_dim); 129 return Status::OK(); 130 } 131 }; 132 133 template <typename Tidx, typename T, bool binary_output> 134 struct BincountReduceFunctor<CPUDevice, Tidx, T, binary_output> { Computetensorflow::functor::BincountReduceFunctor135 static Status Compute(OpKernelContext* context, 136 const typename TTypes<Tidx, 2>::ConstTensor& in, 137 const typename TTypes<T, 2>::ConstTensor& weights, 138 typename TTypes<T, 2>::Tensor& out, 139 const Tidx num_bins) { 140 const int num_rows = out.dimension(0); 141 const int num_cols = in.dimension(1); 142 ThreadPool* thread_pool = 143 context->device()->tensorflow_cpu_worker_threads()->workers; 144 thread_pool->ParallelForWithWorkerId( 145 num_rows, 8 /* cost */, 146 [&](int64 start_row, int64 end_row, int64 worker_id) { 147 for (int64 i = start_row; i < end_row; ++i) { 148 for (int64 j = 0; j < num_cols; ++j) { 149 Tidx value = in(i, j); 150 if (value < num_bins) { 151 if (binary_output) { 152 out(i, value) = T(1); 153 } else { 154 if (weights.size()) { 155 out(i, value) += weights(i, j); 156 } else { 157 out(i, value) += T(1); 158 } 159 } 160 } 161 } 162 } 163 }); 164 return Status::OK(); 165 } 166 }; 167 168 } // namespace functor 169 170 template <typename Device, typename T> 171 class BincountOp : public OpKernel { 172 public: BincountOp(OpKernelConstruction * ctx)173 explicit BincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} 174 Compute(OpKernelContext * ctx)175 void Compute(OpKernelContext* ctx) override { 176 const Tensor& arr_t = ctx->input(0); 177 const Tensor& size_tensor = ctx->input(1); 178 OP_REQUIRES(ctx, size_tensor.dims() == 0, 179 errors::InvalidArgument("Shape must be rank 0 but is rank ", 180 size_tensor.dims())); 181 int32 size = size_tensor.scalar<int32>()(); 182 OP_REQUIRES( 183 ctx, size >= 0, 184 errors::InvalidArgument("size (", size, ") must be non-negative")); 185 186 const Tensor& weights_t = ctx->input(2); 187 const auto arr = arr_t.flat<int32>(); 188 const auto weights = weights_t.flat<T>(); 189 Tensor* output_t; 190 OP_REQUIRES_OK(ctx, 191 ctx->allocate_output(0, TensorShape({size}), &output_t)); 192 auto output = output_t->flat<T>(); 193 OP_REQUIRES_OK(ctx, 194 functor::BincountFunctor<Device, int32, T, false>::Compute( 195 ctx, arr, weights, output, size)); 196 } 197 }; 198 199 #define REGISTER_KERNELS(type) \ 200 REGISTER_KERNEL_BUILDER( \ 201 Name("Bincount").Device(DEVICE_CPU).TypeConstraint<type>("T"), \ 202 BincountOp<CPUDevice, type>) 203 204 TF_CALL_NUMBER_TYPES(REGISTER_KERNELS); 205 #undef REGISTER_KERNELS 206 207 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM 208 209 #define REGISTER_KERNELS(type) \ 210 REGISTER_KERNEL_BUILDER(Name("Bincount") \ 211 .Device(DEVICE_GPU) \ 212 .HostMemory("size") \ 213 .TypeConstraint<type>("T"), \ 214 BincountOp<GPUDevice, type>) 215 216 TF_CALL_int32(REGISTER_KERNELS); 217 TF_CALL_float(REGISTER_KERNELS); 218 #undef REGISTER_KERNELS 219 220 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM 221 222 template <typename Device, typename Tidx, typename T> 223 class DenseBincountOp : public OpKernel { 224 public: DenseBincountOp(OpKernelConstruction * ctx)225 explicit DenseBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 226 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 227 } 228 Compute(OpKernelContext * ctx)229 void Compute(OpKernelContext* ctx) override { 230 const Tensor& data = ctx->input(0); 231 OP_REQUIRES(ctx, data.dims() <= 2, 232 errors::InvalidArgument( 233 "Shape must be at most rank 2 but is rank ", data.dims())); 234 235 const Tensor& size_t = ctx->input(1); 236 const Tensor& weights = ctx->input(2); 237 238 Tidx size = size_t.scalar<Tidx>()(); 239 OP_REQUIRES( 240 ctx, size >= 0, 241 errors::InvalidArgument("size (", size, ") must be non-negative")); 242 243 Tensor* out_t; 244 functor::SetZeroFunctor<Device, T> fill; 245 if (data.dims() == 1) { 246 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({size}), &out_t)); 247 auto out = out_t->flat<T>(); 248 fill(ctx->eigen_device<Device>(), out); 249 if (binary_output_) { 250 OP_REQUIRES_OK( 251 ctx, functor::BincountFunctor<Device, Tidx, T, true>::Compute( 252 ctx, data.flat<Tidx>(), weights.flat<T>(), out, size)); 253 } else { 254 OP_REQUIRES_OK( 255 ctx, functor::BincountFunctor<Device, Tidx, T, false>::Compute( 256 ctx, data.flat<Tidx>(), weights.flat<T>(), out, size)); 257 } 258 } else if (data.dims() == 2) { 259 const int64 num_rows = data.dim_size(0); 260 auto weight_matrix = 261 (weights.NumElements() == 0) 262 ? weights.shaped<T, 2>(gtl::InlinedVector<int64, 2>(2, 0)) 263 : weights.matrix<T>(); 264 OP_REQUIRES_OK( 265 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 266 auto out = out_t->matrix<T>(); 267 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 268 if (binary_output_) { 269 OP_REQUIRES_OK( 270 ctx, functor::BincountReduceFunctor<Device, Tidx, T, true>::Compute( 271 ctx, data.matrix<Tidx>(), weight_matrix, out, size)); 272 } else { 273 OP_REQUIRES_OK( 274 ctx, 275 functor::BincountReduceFunctor<Device, Tidx, T, false>::Compute( 276 ctx, data.matrix<Tidx>(), weight_matrix, out, size)); 277 } 278 } 279 } 280 281 private: 282 bool binary_output_; 283 }; 284 285 #define REGISTER_KERNELS(Tidx, T) \ 286 REGISTER_KERNEL_BUILDER(Name("DenseBincount") \ 287 .Device(DEVICE_CPU) \ 288 .TypeConstraint<T>("T") \ 289 .TypeConstraint<Tidx>("Tidx"), \ 290 DenseBincountOp<CPUDevice, Tidx, T>); 291 #define REGISTER_CPU_KERNELS(T) \ 292 REGISTER_KERNELS(int32, T); \ 293 REGISTER_KERNELS(int64, T); 294 295 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 296 #undef REGISTER_CPU_KERNELS 297 #undef REGISTER_KERNELS 298 299 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM 300 301 #define REGISTER_KERNELS(Tidx, T) \ 302 REGISTER_KERNEL_BUILDER(Name("DenseBincount") \ 303 .Device(DEVICE_GPU) \ 304 .HostMemory("size") \ 305 .TypeConstraint<T>("T") \ 306 .TypeConstraint<Tidx>("Tidx"), \ 307 DenseBincountOp<GPUDevice, Tidx, T>); 308 #define REGISTER_GPU_KERNELS(T) \ 309 REGISTER_KERNELS(int32, T); \ 310 REGISTER_KERNELS(int64, T); 311 312 TF_CALL_int32(REGISTER_GPU_KERNELS); 313 TF_CALL_float(REGISTER_GPU_KERNELS); 314 #undef REGISTER_GPU_KERNELS 315 #undef REGISTER_KERNELS 316 317 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM 318 319 template <typename Device, typename Tidx, typename T> 320 class SparseBincountOp : public OpKernel { 321 public: SparseBincountOp(OpKernelConstruction * ctx)322 explicit SparseBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 323 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 324 } 325 Compute(OpKernelContext * ctx)326 void Compute(OpKernelContext* ctx) override { 327 const Tensor& indices = ctx->input(0); 328 const auto values = ctx->input(1).flat<Tidx>(); 329 const Tensor& dense_shape = ctx->input(2); 330 const Tensor& size_t = ctx->input(3); 331 const auto weights = ctx->input(4).flat<T>(); 332 const int64 weights_size = weights.size(); 333 334 Tidx size = size_t.scalar<Tidx>()(); 335 OP_REQUIRES( 336 ctx, size >= 0, 337 errors::InvalidArgument("size (", size, ") must be non-negative")); 338 339 bool is_1d = dense_shape.NumElements() == 1; 340 341 Tensor* out_t; 342 functor::SetZeroFunctor<Device, T> fill; 343 if (is_1d) { 344 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({size}), &out_t)); 345 auto out = out_t->flat<T>(); 346 fill(ctx->eigen_device<Device>(), out); 347 if (binary_output_) { 348 OP_REQUIRES_OK(ctx, 349 functor::BincountFunctor<Device, Tidx, T, true>::Compute( 350 ctx, values, weights, out, size)); 351 } else { 352 OP_REQUIRES_OK( 353 ctx, functor::BincountFunctor<Device, Tidx, T, false>::Compute( 354 ctx, values, weights, out, size)); 355 } 356 } else { 357 const auto shape = dense_shape.flat<int64>(); 358 const int64 num_rows = shape(0); 359 OP_REQUIRES_OK( 360 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 361 const auto out = out_t->matrix<T>(); 362 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 363 const auto indices_mat = indices.matrix<int64>(); 364 for (int64 i = 0; i < indices_mat.dimension(0); ++i) { 365 const int64 batch = indices_mat(i, 0); 366 const Tidx bin = values(i); 367 if (bin < size) { 368 if (binary_output_) { 369 out(batch, bin) = T(1); 370 } else { 371 if (weights_size) { 372 out(batch, bin) += weights(i); 373 } else { 374 out(batch, bin) += T(1); 375 } 376 } 377 } 378 } 379 } 380 } 381 382 private: 383 bool binary_output_; 384 }; 385 386 #define REGISTER_KERNELS(Tidx, T) \ 387 REGISTER_KERNEL_BUILDER(Name("SparseBincount") \ 388 .Device(DEVICE_CPU) \ 389 .TypeConstraint<T>("T") \ 390 .TypeConstraint<Tidx>("Tidx"), \ 391 SparseBincountOp<CPUDevice, Tidx, T>); 392 #define REGISTER_CPU_KERNELS(T) \ 393 REGISTER_KERNELS(int32, T); \ 394 REGISTER_KERNELS(int64, T); 395 396 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 397 #undef REGISTER_CPU_KERNELS 398 #undef REGISTER_KERNELS 399 400 template <typename Device, typename Tidx, typename T> 401 class RaggedBincountOp : public OpKernel { 402 public: RaggedBincountOp(OpKernelConstruction * ctx)403 explicit RaggedBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 404 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 405 } 406 Compute(OpKernelContext * ctx)407 void Compute(OpKernelContext* ctx) override { 408 const auto splits = ctx->input(0).flat<int64>(); 409 const auto values = ctx->input(1).flat<Tidx>(); 410 const Tensor& size_t = ctx->input(2); 411 const auto weights = ctx->input(3).flat<T>(); 412 const int64 weights_size = weights.size(); 413 414 Tidx size = size_t.scalar<Tidx>()(); 415 OP_REQUIRES( 416 ctx, size >= 0, 417 errors::InvalidArgument("size (", size, ") must be non-negative")); 418 419 int num_rows = splits.size() - 1; 420 int num_values = values.size(); 421 int batch_idx = 0; 422 423 Tensor* out_t; 424 OP_REQUIRES_OK( 425 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 426 functor::SetZeroFunctor<Device, T> fill; 427 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 428 const auto out = out_t->matrix<T>(); 429 430 for (int idx = 0; idx < num_values; ++idx) { 431 while (idx >= splits(batch_idx)) { 432 batch_idx++; 433 } 434 Tidx bin = values(idx); 435 OP_REQUIRES(ctx, bin >= 0, 436 errors::InvalidArgument("Input must be non-negative")); 437 if (bin < size) { 438 if (binary_output_) { 439 out(batch_idx - 1, bin) = T(1); 440 } else { 441 T value = (weights_size > 0) ? weights(idx) : T(1); 442 out(batch_idx - 1, bin) += value; 443 } 444 } 445 } 446 } 447 448 private: 449 bool binary_output_; 450 }; 451 452 #define REGISTER_KERNELS(Tidx, T) \ 453 REGISTER_KERNEL_BUILDER(Name("RaggedBincount") \ 454 .Device(DEVICE_CPU) \ 455 .TypeConstraint<T>("T") \ 456 .TypeConstraint<Tidx>("Tidx"), \ 457 RaggedBincountOp<CPUDevice, Tidx, T>); 458 #define REGISTER_CPU_KERNELS(T) \ 459 REGISTER_KERNELS(int32, T); \ 460 REGISTER_KERNELS(int64, T); 461 462 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 463 #undef REGISTER_CPU_KERNELS 464 #undef REGISTER_KERNELS 465 466 } // end namespace tensorflow 467