1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 17 #define TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 18 19 #include "third_party/eigen3/Eigen/Core" 20 #include "tensorflow/core/framework/tensor_types.h" 21 #include "tensorflow/core/platform/types.h" 22 23 namespace tensorflow { 24 25 using CPUDevice = Eigen::ThreadPoolDevice; 26 27 namespace functor { 28 29 // Compute reduction over all outer dimensions. 30 // Example: 31 // input: [32, 32, 256] 32 // -> 33 // output: [256] 34 template <typename T, typename AccumT, typename BinaryFunctor> 35 struct ReduceOuterDimensions { 36 template <int num_dims> operatorReduceOuterDimensions37 void operator()(const CPUDevice& device, 38 const Eigen::DSizes<Eigen::Index, num_dims>& input_dims, 39 const Tensor& input, Tensor* output) const { 40 static_assert(num_dims >= 2, "Input dimensions must at least 2"); 41 42 // Compute inner and outer dim after reshaping into 2d tensor. 43 int64 inner_dim = input_dims[num_dims - 1]; 44 int64 outer_dim = 1; 45 for (int i = 0; i < num_dims - 1; ++i) outer_dim *= input_dims[i]; 46 47 // Compute block size along the outer dimension for efficiency. 48 const int64 parallel_cell_size = inner_dim; 49 const int64 total_workload = outer_dim * inner_dim; 50 const int64 max_parallelism = total_workload / parallel_cell_size; 51 52 const int64 min_block_workload = 2000; 53 const int64 min_block_size = 54 Eigen::divup(min_block_workload, parallel_cell_size); 55 const int64 max_num_blocks = 56 std::min(max_parallelism, Eigen::divup(total_workload, min_block_size)); 57 58 // Do not create more blocks than there are threads in a pool. 59 const int64 num_threads = device.numThreads(); 60 const int64 num_blocks = std::min(max_num_blocks, num_threads); 61 62 // Block size along the outer dimension. 63 const int64 outer_block_size = Eigen::divup(outer_dim, num_blocks); 64 65 const T* input_data = input.template flat<T>().data(); 66 67 // Allocate temporary buffer for partial reductions. 68 Tensor buffer(DataTypeToEnum<AccumT>::v(), {num_blocks, inner_dim}); 69 buffer.template flat<AccumT>().setZero(); 70 AccumT* buffer_data = buffer.template flat<AccumT>().data(); 71 72 using Buffer = Eigen::TensorMap< 73 Eigen::Tensor<AccumT, 1, Eigen::RowMajor, Eigen::Index>, 74 Eigen::Unaligned>; 75 76 using Input = Eigen::TensorMap< 77 Eigen::Tensor<const T, 1, Eigen::RowMajor, Eigen::Index>, 78 Eigen::Unaligned>; 79 80 const auto compute = [inner_dim, num_blocks, outer_block_size, buffer_data, 81 input_data, outer_dim](Eigen::Index start, 82 Eigen::Index limit) -> void { 83 DCHECK(start >= 0 && limit <= num_blocks); 84 int64 outer_dim_start = start * outer_block_size; 85 int64 outer_dim_limit = limit * outer_block_size; 86 outer_dim_limit = std::min(outer_dim, outer_dim_limit); 87 88 Buffer buf(buffer_data + start * inner_dim, inner_dim); 89 for (int64 i = outer_dim_start; i < outer_dim_limit; ++i) { 90 auto in = Input(input_data + i * inner_dim, inner_dim); 91 auto cast = in.template cast<AccumT>(); 92 buf = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf), 93 const decltype(cast)>(buf, cast); 94 } 95 }; 96 97 // Compute cost of reducing a single block. 98 const int64 compute_size = outer_block_size * inner_dim; 99 const int64 compute_input_bytes = compute_size * sizeof(T); 100 const Eigen::TensorOpCost cost( 101 compute_input_bytes, 102 0, // We'll be mostly writing to L1, assume store cost is 0 103 compute_size * Eigen::internal::functor_traits<BinaryFunctor>::Cost); 104 105 device.parallelFor(num_blocks, cost, compute); 106 107 // Aggregate partial results from temporary buffer into first block. 108 auto buf0 = Buffer(buffer_data, inner_dim); 109 // TODO(ezhulenev): Parallelize this loop for large inner dimensions? 110 for (int i = 1; i < num_blocks; ++i) { 111 auto buf = Buffer(buffer_data + i * inner_dim, inner_dim); 112 buf0 = Eigen::TensorCwiseBinaryOp<BinaryFunctor, const decltype(buf0), 113 const decltype(buf)>(buf0, buf); 114 } 115 116 // Write final result to the output. 117 output->template flat<T>() = buf0.template cast<T>(); 118 } 119 }; 120 121 } // namespace functor 122 } // namespace tensorflow 123 124 #endif // TENSORFLOW_CORE_KERNELS_REDUX_FUNCTOR_H_ 125