1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 // See docs in ../ops/math_ops.cc. 17 18 #include "tensorflow/core/platform/errors.h" 19 #define EIGEN_USE_THREADS 20 21 #include "tensorflow/core/framework/op_kernel.h" 22 #include "tensorflow/core/framework/register_types.h" 23 #include "tensorflow/core/framework/types.h" 24 #include "tensorflow/core/kernels/bincount_op.h" 25 #include "tensorflow/core/kernels/fill_functor.h" 26 #include "tensorflow/core/lib/core/threadpool.h" 27 #include "tensorflow/core/platform/types.h" 28 29 namespace tensorflow { 30 31 using thread::ThreadPool; 32 33 typedef Eigen::ThreadPoolDevice CPUDevice; 34 typedef Eigen::GpuDevice GPUDevice; 35 36 namespace functor { 37 38 template <typename Tidx, typename T> 39 struct BincountFunctor<CPUDevice, Tidx, T, true> { Computetensorflow::functor::BincountFunctor40 static Status Compute(OpKernelContext* context, 41 const typename TTypes<Tidx, 1>::ConstTensor& arr, 42 const typename TTypes<T, 1>::ConstTensor& weights, 43 typename TTypes<T, 1>::Tensor& output, 44 const Tidx num_bins) { 45 Tensor all_nonneg_t; 46 TF_RETURN_IF_ERROR(context->allocate_temp( 47 DT_BOOL, TensorShape({}), &all_nonneg_t, AllocatorAttributes())); 48 all_nonneg_t.scalar<bool>().device(context->eigen_cpu_device()) = 49 (arr >= Tidx(0)).all(); 50 if (!all_nonneg_t.scalar<bool>()()) { 51 return errors::InvalidArgument("Input arr must be non-negative!"); 52 } 53 54 // Allocate partial output bin sums for each worker thread. Worker ids in 55 // ParallelForWithWorkerId range from 0 to NumThreads() inclusive. 56 ThreadPool* thread_pool = 57 context->device()->tensorflow_cpu_worker_threads()->workers; 58 const int64_t num_threads = thread_pool->NumThreads() + 1; 59 Tensor partial_bins_t; 60 TF_RETURN_IF_ERROR(context->allocate_temp( 61 DT_BOOL, TensorShape({num_threads, num_bins}), &partial_bins_t)); 62 auto partial_bins = partial_bins_t.matrix<bool>(); 63 partial_bins.setZero(); 64 thread_pool->ParallelForWithWorkerId( 65 arr.size(), 8 /* cost */, 66 [&](int64_t start_ind, int64_t limit_ind, int64_t worker_id) { 67 for (int64_t i = start_ind; i < limit_ind; i++) { 68 Tidx value = arr(i); 69 if (value < num_bins) { 70 partial_bins(worker_id, value) = true; 71 } 72 } 73 }); 74 75 // Sum the partial bins along the 0th axis. 76 Eigen::array<int, 1> reduce_dim({0}); 77 output.device(context->eigen_cpu_device()) = 78 partial_bins.any(reduce_dim).cast<T>(); 79 return Status::OK(); 80 } 81 }; 82 83 template <typename Tidx, typename T> 84 struct BincountFunctor<CPUDevice, Tidx, T, false> { Computetensorflow::functor::BincountFunctor85 static Status Compute(OpKernelContext* context, 86 const typename TTypes<Tidx, 1>::ConstTensor& arr, 87 const typename TTypes<T, 1>::ConstTensor& weights, 88 typename TTypes<T, 1>::Tensor& output, 89 const Tidx num_bins) { 90 Tensor all_nonneg_t; 91 TF_RETURN_IF_ERROR(context->allocate_temp( 92 DT_BOOL, TensorShape({}), &all_nonneg_t, AllocatorAttributes())); 93 all_nonneg_t.scalar<bool>().device(context->eigen_cpu_device()) = 94 (arr >= Tidx(0)).all(); 95 if (!all_nonneg_t.scalar<bool>()()) { 96 return errors::InvalidArgument("Input arr must be non-negative!"); 97 } 98 99 // Allocate partial output bin sums for each worker thread. Worker ids in 100 // ParallelForWithWorkerId range from 0 to NumThreads() inclusive. 101 ThreadPool* thread_pool = 102 context->device()->tensorflow_cpu_worker_threads()->workers; 103 const int64_t num_threads = thread_pool->NumThreads() + 1; 104 const Tidx* arr_data = arr.data(); 105 const std::ptrdiff_t arr_size = arr.size(); 106 const T* weight_data = weights.data(); 107 if (weights.size() && weights.size() != arr_size) { 108 return errors::InvalidArgument( 109 "Input indices and weights must have the same size."); 110 } 111 if (num_threads == 1) { 112 output.setZero(); 113 T* output_data = output.data(); 114 if (weights.size()) { 115 for (int64_t i = 0; i < arr_size; i++) { 116 const Tidx value = arr_data[i]; 117 if (value < num_bins) { 118 output_data[value] += weight_data[i]; 119 } 120 } 121 } else { 122 for (int64_t i = 0; i < arr_size; i++) { 123 const Tidx value = arr_data[i]; 124 if (value < num_bins) { 125 // Complex numbers don't support "++". 126 output_data[value] += T(1); 127 } 128 } 129 } 130 } else { 131 Tensor partial_bins_t; 132 TF_RETURN_IF_ERROR(context->allocate_temp( 133 DataTypeToEnum<T>::value, TensorShape({num_threads, num_bins}), 134 &partial_bins_t)); 135 auto partial_bins = partial_bins_t.matrix<T>(); 136 partial_bins.setZero(); 137 thread_pool->ParallelForWithWorkerId( 138 arr_size, 8 /* cost */, 139 [&](int64_t start_ind, int64_t limit_ind, int64_t worker_id) { 140 if (weights.size()) { 141 for (int64_t i = start_ind; i < limit_ind; i++) { 142 Tidx value = arr_data[i]; 143 if (value < num_bins) { 144 partial_bins(worker_id, value) += weight_data[i]; 145 } 146 } 147 } else { 148 for (int64_t i = start_ind; i < limit_ind; i++) { 149 Tidx value = arr_data[i]; 150 if (value < num_bins) { 151 // Complex numbers don't support "++". 152 partial_bins(worker_id, value) += T(1); 153 } 154 } 155 } 156 }); 157 158 // Sum the partial bins along the 0th axis. 159 Eigen::array<int, 1> reduce_dim({0}); 160 output.device(context->eigen_cpu_device()) = partial_bins.sum(reduce_dim); 161 } 162 return Status::OK(); 163 } 164 }; 165 166 template <typename Tidx, typename T, bool binary_output> 167 struct BincountReduceFunctor<CPUDevice, Tidx, T, binary_output> { Computetensorflow::functor::BincountReduceFunctor168 static Status Compute(OpKernelContext* context, 169 const typename TTypes<Tidx, 2>::ConstTensor& in, 170 const typename TTypes<T, 2>::ConstTensor& weights, 171 typename TTypes<T, 2>::Tensor& out, 172 const Tidx num_bins) { 173 const int num_rows = out.dimension(0); 174 const int num_cols = in.dimension(1); 175 ThreadPool* thread_pool = 176 context->device()->tensorflow_cpu_worker_threads()->workers; 177 thread_pool->ParallelForWithWorkerId( 178 num_rows, 8 /* cost */, 179 [&](int64_t start_row, int64_t end_row, int64_t worker_id) { 180 for (int64_t i = start_row; i < end_row; ++i) { 181 for (int64_t j = 0; j < num_cols; ++j) { 182 Tidx value = in(i, j); 183 if (value < num_bins) { 184 if (binary_output) { 185 out(i, value) = T(1); 186 } else { 187 if (weights.size()) { 188 out(i, value) += weights(i, j); 189 } else { 190 out(i, value) += T(1); 191 } 192 } 193 } 194 } 195 } 196 }); 197 return Status::OK(); 198 } 199 }; 200 201 } // namespace functor 202 203 template <typename Device, typename T> 204 class BincountOp : public OpKernel { 205 public: BincountOp(OpKernelConstruction * ctx)206 explicit BincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) {} 207 Compute(OpKernelContext * ctx)208 void Compute(OpKernelContext* ctx) override { 209 const Tensor& arr_t = ctx->input(0); 210 const Tensor& size_tensor = ctx->input(1); 211 OP_REQUIRES(ctx, size_tensor.dims() == 0, 212 errors::InvalidArgument("Shape must be rank 0 but is rank ", 213 size_tensor.dims())); 214 int32_t size = size_tensor.scalar<int32_t>()(); 215 OP_REQUIRES( 216 ctx, size >= 0, 217 errors::InvalidArgument("size (", size, ") must be non-negative")); 218 219 const Tensor& weights_t = ctx->input(2); 220 const auto arr = arr_t.flat<int32_t>(); 221 const auto weights = weights_t.flat<T>(); 222 Tensor* output_t; 223 OP_REQUIRES_OK(ctx, 224 ctx->allocate_output(0, TensorShape({size}), &output_t)); 225 auto output = output_t->flat<T>(); 226 OP_REQUIRES_OK(ctx, 227 functor::BincountFunctor<Device, int32_t, T, false>::Compute( 228 ctx, arr, weights, output, size)); 229 } 230 }; 231 232 #define REGISTER_KERNELS(type) \ 233 REGISTER_KERNEL_BUILDER( \ 234 Name("Bincount").Device(DEVICE_CPU).TypeConstraint<type>("T"), \ 235 BincountOp<CPUDevice, type>) 236 237 TF_CALL_NUMBER_TYPES(REGISTER_KERNELS); 238 #undef REGISTER_KERNELS 239 240 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM 241 242 #define REGISTER_KERNELS(type) \ 243 REGISTER_KERNEL_BUILDER(Name("Bincount") \ 244 .Device(DEVICE_GPU) \ 245 .HostMemory("size") \ 246 .TypeConstraint<type>("T"), \ 247 BincountOp<GPUDevice, type>) 248 249 TF_CALL_int32(REGISTER_KERNELS); 250 TF_CALL_float(REGISTER_KERNELS); 251 #undef REGISTER_KERNELS 252 253 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM 254 255 template <typename Device, typename Tidx, typename T> 256 class DenseBincountOp : public OpKernel { 257 public: DenseBincountOp(OpKernelConstruction * ctx)258 explicit DenseBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 259 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 260 } 261 Compute(OpKernelContext * ctx)262 void Compute(OpKernelContext* ctx) override { 263 const Tensor& data = ctx->input(0); 264 OP_REQUIRES(ctx, data.dims() <= 2, 265 errors::InvalidArgument( 266 "Shape must be at most rank 2 but is rank ", data.dims())); 267 268 const Tensor& size_t = ctx->input(1); 269 const Tensor& weights = ctx->input(2); 270 271 Tidx size = size_t.scalar<Tidx>()(); 272 OP_REQUIRES( 273 ctx, size >= 0, 274 errors::InvalidArgument("size (", size, ") must be non-negative")); 275 276 Tensor* out_t; 277 functor::SetZeroFunctor<Device, T> fill; 278 if (data.dims() == 1) { 279 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({size}), &out_t)); 280 auto out = out_t->flat<T>(); 281 fill(ctx->eigen_device<Device>(), out); 282 if (binary_output_) { 283 OP_REQUIRES_OK( 284 ctx, functor::BincountFunctor<Device, Tidx, T, true>::Compute( 285 ctx, data.flat<Tidx>(), weights.flat<T>(), out, size)); 286 } else { 287 OP_REQUIRES_OK( 288 ctx, functor::BincountFunctor<Device, Tidx, T, false>::Compute( 289 ctx, data.flat<Tidx>(), weights.flat<T>(), out, size)); 290 } 291 } else if (data.dims() == 2) { 292 const int64_t num_rows = data.dim_size(0); 293 auto weight_matrix = 294 (weights.NumElements() == 0) 295 ? weights.shaped<T, 2>(gtl::InlinedVector<int64_t, 2>(2, 0)) 296 : weights.matrix<T>(); 297 OP_REQUIRES_OK( 298 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 299 auto out = out_t->matrix<T>(); 300 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 301 if (binary_output_) { 302 OP_REQUIRES_OK( 303 ctx, functor::BincountReduceFunctor<Device, Tidx, T, true>::Compute( 304 ctx, data.matrix<Tidx>(), weight_matrix, out, size)); 305 } else { 306 OP_REQUIRES_OK( 307 ctx, 308 functor::BincountReduceFunctor<Device, Tidx, T, false>::Compute( 309 ctx, data.matrix<Tidx>(), weight_matrix, out, size)); 310 } 311 } 312 } 313 314 private: 315 bool binary_output_; 316 }; 317 318 #define REGISTER_KERNELS(Tidx, T) \ 319 REGISTER_KERNEL_BUILDER(Name("DenseBincount") \ 320 .Device(DEVICE_CPU) \ 321 .TypeConstraint<T>("T") \ 322 .TypeConstraint<Tidx>("Tidx"), \ 323 DenseBincountOp<CPUDevice, Tidx, T>); 324 #define REGISTER_CPU_KERNELS(T) \ 325 REGISTER_KERNELS(int32, T); \ 326 REGISTER_KERNELS(int64, T); 327 328 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 329 #undef REGISTER_CPU_KERNELS 330 #undef REGISTER_KERNELS 331 332 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM 333 334 #define REGISTER_KERNELS(Tidx, T) \ 335 REGISTER_KERNEL_BUILDER(Name("DenseBincount") \ 336 .Device(DEVICE_GPU) \ 337 .HostMemory("size") \ 338 .TypeConstraint<T>("T") \ 339 .TypeConstraint<Tidx>("Tidx"), \ 340 DenseBincountOp<GPUDevice, Tidx, T>); 341 #define REGISTER_GPU_KERNELS(T) \ 342 REGISTER_KERNELS(int32, T); \ 343 REGISTER_KERNELS(int64, T); 344 345 TF_CALL_int32(REGISTER_GPU_KERNELS); 346 TF_CALL_float(REGISTER_GPU_KERNELS); 347 #undef REGISTER_GPU_KERNELS 348 #undef REGISTER_KERNELS 349 350 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM 351 352 template <typename Device, typename Tidx, typename T> 353 class SparseBincountOp : public OpKernel { 354 public: SparseBincountOp(OpKernelConstruction * ctx)355 explicit SparseBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 356 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 357 } 358 Compute(OpKernelContext * ctx)359 void Compute(OpKernelContext* ctx) override { 360 const Tensor& indices = ctx->input(0); 361 const auto values = ctx->input(1).flat<Tidx>(); 362 const Tensor& dense_shape = ctx->input(2); 363 const Tensor& size_t = ctx->input(3); 364 const auto weights = ctx->input(4).flat<T>(); 365 const int64_t weights_size = weights.size(); 366 367 Tidx size = size_t.scalar<Tidx>()(); 368 OP_REQUIRES( 369 ctx, size >= 0, 370 errors::InvalidArgument("size (", size, ") must be non-negative")); 371 372 bool is_1d = dense_shape.NumElements() == 1; 373 374 Tensor* out_t; 375 functor::SetZeroFunctor<Device, T> fill; 376 if (is_1d) { 377 OP_REQUIRES_OK(ctx, ctx->allocate_output(0, TensorShape({size}), &out_t)); 378 auto out = out_t->flat<T>(); 379 fill(ctx->eigen_device<Device>(), out); 380 if (binary_output_) { 381 OP_REQUIRES_OK(ctx, 382 functor::BincountFunctor<Device, Tidx, T, true>::Compute( 383 ctx, values, weights, out, size)); 384 } else { 385 OP_REQUIRES_OK( 386 ctx, functor::BincountFunctor<Device, Tidx, T, false>::Compute( 387 ctx, values, weights, out, size)); 388 } 389 } else { 390 const auto shape = dense_shape.flat<int64_t>(); 391 const int64_t num_rows = shape(0); 392 OP_REQUIRES_OK( 393 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 394 const auto out = out_t->matrix<T>(); 395 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 396 const auto indices_mat = indices.matrix<int64_t>(); 397 for (int64_t i = 0; i < indices_mat.dimension(0); ++i) { 398 const int64_t batch = indices_mat(i, 0); 399 const Tidx bin = values(i); 400 if (bin < size) { 401 if (binary_output_) { 402 out(batch, bin) = T(1); 403 } else { 404 if (weights_size) { 405 out(batch, bin) += weights(i); 406 } else { 407 out(batch, bin) += T(1); 408 } 409 } 410 } 411 } 412 } 413 } 414 415 private: 416 bool binary_output_; 417 }; 418 419 #define REGISTER_KERNELS(Tidx, T) \ 420 REGISTER_KERNEL_BUILDER(Name("SparseBincount") \ 421 .Device(DEVICE_CPU) \ 422 .TypeConstraint<T>("T") \ 423 .TypeConstraint<Tidx>("Tidx"), \ 424 SparseBincountOp<CPUDevice, Tidx, T>); 425 #define REGISTER_CPU_KERNELS(T) \ 426 REGISTER_KERNELS(int32, T); \ 427 REGISTER_KERNELS(int64, T); 428 429 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 430 #undef REGISTER_CPU_KERNELS 431 #undef REGISTER_KERNELS 432 433 template <typename Device, typename Tidx, typename T> 434 class RaggedBincountOp : public OpKernel { 435 public: RaggedBincountOp(OpKernelConstruction * ctx)436 explicit RaggedBincountOp(OpKernelConstruction* ctx) : OpKernel(ctx) { 437 OP_REQUIRES_OK(ctx, ctx->GetAttr("binary_output", &binary_output_)); 438 } 439 Compute(OpKernelContext * ctx)440 void Compute(OpKernelContext* ctx) override { 441 const auto splits = ctx->input(0).flat<int64_t>(); 442 const auto values = ctx->input(1).flat<Tidx>(); 443 const Tensor& size_t = ctx->input(2); 444 const auto weights = ctx->input(3).flat<T>(); 445 const int64_t weights_size = weights.size(); 446 447 Tidx size = size_t.scalar<Tidx>()(); 448 OP_REQUIRES( 449 ctx, size >= 0, 450 errors::InvalidArgument("size (", size, ") must be non-negative")); 451 452 int num_rows = splits.size() - 1; 453 int num_values = values.size(); 454 int batch_idx = 0; 455 456 OP_REQUIRES(ctx, splits(0) == 0, 457 errors::InvalidArgument("Splits must start with 0, not with ", 458 splits(0))); 459 460 OP_REQUIRES(ctx, splits(num_rows) == num_values, 461 errors::InvalidArgument( 462 "Splits must end with the number of values, got ", 463 splits(num_rows), " instead of ", num_values)); 464 465 Tensor* out_t; 466 OP_REQUIRES_OK( 467 ctx, ctx->allocate_output(0, TensorShape({num_rows, size}), &out_t)); 468 functor::SetZeroFunctor<Device, T> fill; 469 fill(ctx->eigen_device<Device>(), out_t->flat<T>()); 470 const auto out = out_t->matrix<T>(); 471 472 for (int idx = 0; idx < num_values; ++idx) { 473 while (idx >= splits(batch_idx)) { 474 batch_idx++; 475 } 476 Tidx bin = values(idx); 477 OP_REQUIRES(ctx, bin >= 0, 478 errors::InvalidArgument("Input must be non-negative")); 479 if (bin < size) { 480 if (binary_output_) { 481 out(batch_idx - 1, bin) = T(1); 482 } else { 483 T value = (weights_size > 0) ? weights(idx) : T(1); 484 out(batch_idx - 1, bin) += value; 485 } 486 } 487 } 488 } 489 490 private: 491 bool binary_output_; 492 }; 493 494 #define REGISTER_KERNELS(Tidx, T) \ 495 REGISTER_KERNEL_BUILDER(Name("RaggedBincount") \ 496 .Device(DEVICE_CPU) \ 497 .TypeConstraint<T>("T") \ 498 .TypeConstraint<Tidx>("Tidx"), \ 499 RaggedBincountOp<CPUDevice, Tidx, T>); 500 #define REGISTER_CPU_KERNELS(T) \ 501 REGISTER_KERNELS(int32, T); \ 502 REGISTER_KERNELS(int64, T); 503 504 TF_CALL_NUMBER_TYPES(REGISTER_CPU_KERNELS); 505 #undef REGISTER_CPU_KERNELS 506 #undef REGISTER_KERNELS 507 508 } // end namespace tensorflow 509