1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ 17 #define TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ 18 19 #include "tensorflow/core/kernels/typed_conditional_accumulator_base.h" 20 21 namespace tensorflow { 22 23 /** 24 * An aggregation object for adding sparse gradients, represented as a tuple of 25 * indices, values, and a (possibly empty) shape. 26 * 27 * The two main methods of this class are TryApplyGrad and TryTakeGrad. 28 * 29 * TryApplyGrad tries add a gradient to the accumulator. The attempt is 30 * successful if local_step >= global_step, i.e., if the gradient is not stale, 31 * having been computed using up-to-date information. Otherwise, the gradient is 32 * silently dropped. 33 * 34 * TryTakeGrad logs an attempt to read the average gradient. The attempt is 35 * blocked until the number of gradients accumulated (via TryApplyGrad) is equal 36 * or exceeds the number requested by TryTakeGrad. 37 * Once this condition is satisfied, the following actions are taken: 38 * (1) the value of the average gradient is returned 39 * (2) the count of accumulated gradients is reset to 0 40 * (3) the internal global_step value (current_global_step_) is incremented by 1 41 * 42 * SparseConditionalAccumulator is the datatype-dependent templated sub-class of 43 * ConditionalAccumulatorBase. It implements the virtual arithmetic methods that 44 * are used by for aggregating, averaging, allocating, returning indexed slices. 45 */ 46 template <typename Device, typename T> 47 class SparseConditionalAccumulator 48 : public TypedConditionalAccumulatorBase< 49 std::tuple<const Tensor*, const Tensor*, const Tensor*>> { 50 public: SparseConditionalAccumulator(const DataType & dtype,const PartialTensorShape & shape,const string & name,const string & reduction_type)51 SparseConditionalAccumulator(const DataType& dtype, 52 const PartialTensorShape& shape, 53 const string& name, const string& reduction_type) 54 : TypedConditionalAccumulatorBase< 55 std::tuple<const Tensor*, const Tensor*, const Tensor*>>( 56 dtype, shape, name, reduction_type), 57 accum_val_(std::make_unique<Tensor>()) {} 58 59 protected: 60 std::unique_ptr<std::vector<int64>> accum_idx_vec_; 61 std::unique_ptr<std::vector<int>> count_element_; 62 63 std::unique_ptr<Tensor> accum_val_; 64 65 typedef Eigen::TensorMap<Eigen::Tensor<T, 1, Eigen::RowMajor>, 66 Eigen::Unaligned> 67 SliceT; 68 typedef Eigen::TensorMap<Eigen::Tensor<const T, 1, Eigen::RowMajor>, 69 Eigen::Unaligned> 70 SliceConstT; 71 ValidateShape(std::tuple<const Tensor *,const Tensor *,const Tensor * > * tensor,bool has_known_shape)72 Status ValidateShape( 73 std::tuple<const Tensor*, const Tensor*, const Tensor*>* tensor, 74 bool has_known_shape) TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { 75 const Tensor* tensor_idx = std::get<0>(*tensor); 76 const Tensor* tensor_val = std::get<1>(*tensor); 77 const Tensor* tensor_shape = std::get<2>(*tensor); 78 int64_t grad_val_dims = tensor_val->dims(); 79 int64_t grad_dims = grad_val_dims; 80 81 // Compare with provided shape 82 if (has_known_shape) { 83 if (shape_.dims() > tensor_shape->NumElements()) { 84 return errors::InvalidArgument( 85 "Shape mismatch: expected shape rank at least ", shape_.dims(), 86 ", got ", tensor_shape->NumElements()); 87 } 88 const auto tensor_shape_flat = tensor_shape->flat<int64>(); 89 for (int64_t i = 0; i < shape_.dims(); i++) { 90 if (shape_.dim_size(i) != -1 && 91 shape_.dim_size(i) != tensor_shape_flat(i)) { 92 return errors::InvalidArgument("Shape mismatch: expected shape dim ", 93 i, " to be ", shape_.dim_size(i), 94 ", got ", tensor_shape_flat(i)); 95 } 96 } 97 } 98 // Check that indices are within limits 99 if (shape_.dims() > 0 && shape_.dim_size(0) != -1 && 100 tensor_idx->dims() > 0) { 101 for (int64_t i = 0; i < tensor_idx->dim_size(0); i++) { 102 if (tensor_idx->vec<int64>()(i) >= shape_.dim_size(0)) { 103 return errors::InvalidArgument( 104 "Shape mismatch: index of slice ", i, " exceeded limits of shape", 105 "; index is ", tensor_idx->vec<int64>()(i), " exceeded ", 106 shape_.dim_size(0)); 107 } 108 } 109 } 110 111 // Check values compatibility with accumulated gradient if available 112 if (counter_ > 0) { 113 int64_t accum_val_dims = accum_val_->dims(); 114 if (accum_val_dims != grad_val_dims) { 115 return errors::InvalidArgument("Shape mismatch: expected values rank ", 116 accum_val_dims, ", got ", grad_val_dims); 117 } 118 for (int64_t i = 1; i < accum_val_dims; i++) { 119 if (accum_val_->dim_size(i) != tensor_val->dim_size(i)) { 120 return errors::InvalidArgument("Shape mismatch: expected values dim ", 121 i, " to be ", accum_val_->dim_size(i), 122 ", got ", tensor_val->dim_size(i)); 123 } 124 } 125 } else { 126 // If there are no accumulated gradients, check against shape_ 127 if (shape_.dims() > grad_dims) { 128 return errors::InvalidArgument( 129 "Shape mismatch: expected values rank at least ", shape_.dims(), 130 ", got ", grad_dims); 131 } 132 // Check that values have correct dimensions 133 for (int64_t i = 1; i < shape_.dims(); i++) { 134 if (shape_.dim_size(i) != -1 && 135 shape_.dim_size(i) != tensor_val->dim_size(i)) { 136 return errors::InvalidArgument("Shape mismatch: expected values dim ", 137 i, " to be ", shape_.dim_size(i), 138 ", got ", tensor_val->dim_size(i)); 139 } 140 } 141 } 142 143 return Status::OK(); 144 } 145 AllocateAndAssignToAccumGradFunction(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > * grad)146 void AllocateAndAssignToAccumGradFunction( 147 OpKernelContext* ctx, 148 std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override { 149 const Tensor* grad_idx = std::get<0>(*grad); 150 const Tensor* grad_val = std::get<1>(*grad); 151 152 const int64_t nnz = grad_idx->dim_size(0); 153 154 // Assign indices 155 accum_idx_vec_ = std::make_unique<std::vector<int64>>(); 156 accum_idx_vec_->reserve(nnz); 157 for (int i = 0; i < nnz; i++) { 158 accum_idx_vec_->push_back(grad_idx->vec<int64>()(i)); 159 } 160 161 // Assign values to accum_val_tensor 162 OP_REQUIRES_OK( 163 ctx, ctx->allocate_temp(dtype_, grad_val->shape(), accum_val_.get())); 164 accum_val_->flat<T>().device(ctx->template eigen_device<Device>()) = 165 grad_val->flat<T>(); 166 167 // Assign count_element_ 168 count_element_ = std::make_unique<std::vector<int>>(nnz, 1); 169 170 // Do not need shape; Assume that the op has checked that the shapes match, 171 // so grad's shape == shape_ 172 } 173 AddToAccumGradFunction(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > * grad)174 void AddToAccumGradFunction( 175 OpKernelContext* ctx, 176 std::tuple<const Tensor*, const Tensor*, const Tensor*>* grad) override { 177 // Modeled after third_party/tensorflow/core/kernels/sparse_add_op 178 179 const Tensor* grad_idx = std::get<0>(*grad); 180 const Tensor* grad_val = std::get<1>(*grad); 181 182 const int64_t accum_nnz = accum_idx_vec_->size(); 183 const int64_t grad_nnz = grad_idx->dim_size(0); 184 185 // Source enumerates the origin of a non-zero element: whether it is from 186 // the new gradient, the accumulated gradient, or the sum of both. 187 enum Source { from_accum, from_grad, from_accum_and_grad }; 188 189 // (1) do a pass over inputs, and append values and indices to vectors 190 std::vector<std::tuple<Source, int64, int64>> entries_to_copy; 191 entries_to_copy.reserve(accum_nnz + grad_nnz); 192 193 // Pass over all non-zero elements of both the gradient and the accumulated 194 // value, to identify where each non-zero element of the sum comes from. 195 // The input and output indexed slices are assumed to be ordered along 196 // increasing dimension number. 197 int64_t i = 0, j = 0; 198 int64_t sum_nnz = 0; 199 while (i < accum_nnz && j < grad_nnz) { 200 sum_nnz++; 201 switch (cmp(accum_idx_vec_.get(), grad_idx, i, j)) { 202 case -1: 203 entries_to_copy.emplace_back(from_accum, i, -1); 204 ++i; 205 break; 206 case 0: 207 entries_to_copy.emplace_back(from_accum_and_grad, i, j); 208 ++i; 209 ++j; 210 break; 211 case 1: 212 entries_to_copy.emplace_back(from_grad, -1, j); 213 ++j; 214 break; 215 } 216 } 217 218 // Handle leftovers 219 while (i < accum_nnz) { 220 sum_nnz++; 221 entries_to_copy.emplace_back(from_accum, i, -1); 222 ++i; 223 } 224 while (j < grad_nnz) { 225 sum_nnz++; 226 entries_to_copy.emplace_back(from_grad, -1, j); 227 ++j; 228 } 229 230 // (2) Copy or sum the non-zero elements into sum_indices and sum_tensor 231 std::vector<int64>* sum_indices_vec = new std::vector<int64>(); 232 sum_indices_vec->reserve(sum_nnz); 233 234 std::vector<int>* sum_counts = new std::vector<int>(); 235 sum_counts->reserve(sum_nnz); 236 237 Tensor* sum_tensor = new Tensor(); 238 239 TensorShape sum_shape = grad_val->shape(); 240 sum_shape.set_dim(0, sum_nnz); 241 242 OP_REQUIRES_OK(ctx, ctx->allocate_temp(dtype_, sum_shape, sum_tensor)); 243 auto sum_flat = sum_tensor->flat_outer_dims<T>(); 244 auto accum_flat = accum_val_->flat_outer_dims<T>(); 245 auto grad_flat = grad_val->flat_outer_dims<T>(); 246 247 const int64_t num_col = grad_flat.dimension(1); 248 249 Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(num_col); 250 251 for (i = 0; i < sum_nnz; ++i) { 252 const Source src = std::get<0>(entries_to_copy[i]); 253 const int64_t idx_a = std::get<1>(entries_to_copy[i]); 254 const int64_t idx_b = std::get<2>(entries_to_copy[i]); 255 T* sum_slice_ptr = &sum_flat(i, 0); 256 SliceT sum_slice(sum_slice_ptr, slice_shape); 257 if (src == from_accum) { 258 // Element comes from accumulator; directly copy data structures over 259 sum_indices_vec->push_back(accum_idx_vec_->at(idx_a)); 260 T* accum_slice_ptr = &accum_flat(idx_a, 0); 261 SliceT accum_slice(accum_slice_ptr, slice_shape); 262 sum_slice = accum_slice; 263 sum_counts->push_back(count_element_->at(idx_a)); 264 } else if (src == from_accum_and_grad) { 265 // Element is a sum of accumulated value and new gradient; 266 // compute sum here 267 sum_indices_vec->push_back(accum_idx_vec_->at(idx_a)); 268 const T* grad_slice_ptr = &grad_flat(idx_b, 0); 269 SliceConstT grad_slice(grad_slice_ptr, slice_shape); 270 T* accum_slice_ptr = &accum_flat(idx_a, 0); 271 SliceT accum_slice(accum_slice_ptr, slice_shape); 272 sum_slice = grad_slice + accum_slice; 273 sum_counts->push_back(count_element_->at(idx_a) + 1); 274 } else if (src == from_grad) { 275 // Element comes from new gradient; make a copy of indices and values 276 sum_indices_vec->push_back(grad_idx->vec<int64>()(idx_b)); 277 const T* grad_slice_ptr = &grad_flat(idx_b, 0); 278 SliceConstT grad_slice(grad_slice_ptr, slice_shape); 279 sum_slice = grad_slice; 280 sum_counts->push_back(1); 281 } 282 } 283 284 // (3) Keep output, i.e., switch pointers to point to new data structures 285 // representing the sum 286 // Indices 287 accum_idx_vec_.reset(sum_indices_vec); 288 // Values 289 accum_val_.reset(sum_tensor); 290 // Counts 291 count_element_.reset(sum_counts); 292 293 // No need to copy shape, since shape remains the same after sum. 294 } 295 DivideAccumGradByCounter(OpKernelContext * ctx)296 void DivideAccumGradByCounter(OpKernelContext* ctx) override 297 TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { 298 const int64_t nnz = count_element_->size(); 299 auto accum_flat = accum_val_->flat_outer_dims<T>(); 300 std::vector<T> count_typet; 301 std::transform(count_element_->begin(), count_element_->end(), 302 std::back_inserter(count_typet), 303 TypeConverter<T, int>::ConvertUToT); 304 305 // Option 1: divide all by counter 306 /* 307 std::transform( 308 &accum_flat(0,0), &accum_flat(nnz,0), &accum_flat(0,0), 309 std::bind2nd(std::divides<T>(), 310 TypeConverter<T, int>::ConvertUToT(this->counter_))); 311 */ 312 313 // Option 2: average element-wise 314 Eigen::DSizes<Eigen::DenseIndex, 1> slice_shape(accum_flat.dimension(1)); 315 for (int64_t i = 0; i < nnz; i++) { 316 T* accum_slice_ptr = &accum_flat(i, 0); 317 SliceT accum_slice(accum_slice_ptr, slice_shape); 318 accum_slice.device(ctx->template eigen_device<Device>()) = 319 accum_slice / count_typet[i]; 320 } 321 } 322 SetOutput(OpKernelContext * ctx)323 bool SetOutput(OpKernelContext* ctx) override { 324 bool is_successful = true; 325 if (is_successful) is_successful = ReturnIdxTensor(ctx); 326 if (is_successful) is_successful = ReturnValTensor(ctx); 327 if (is_successful) is_successful = ReturnShapeTensor(ctx); 328 return is_successful; 329 } 330 GetAndValidateTensorInputForApplyGrad(OpKernelContext * ctx,std::tuple<const Tensor *,const Tensor *,const Tensor * > ** tensor)331 bool GetAndValidateTensorInputForApplyGrad( 332 OpKernelContext* ctx, 333 std::tuple<const Tensor*, const Tensor*, const Tensor*>** tensor) override 334 TF_EXCLUSIVE_LOCKS_REQUIRED(this->mu_) { 335 // TODO(xinghao, jmchen): The roundabout way of getting attr from 336 // OpKernelContext (instead of OpKernelConstruction) is a hack, and should 337 // be fixed if it affects efficiency. 338 bool has_known_shape = false; 339 OP_REQUIRES_OK_BOOLEAN( 340 ctx, GetNodeAttr(ctx->op_kernel().def(), "has_known_shape", 341 &has_known_shape)); 342 343 // Get input gradient tensors 344 const Tensor* grad_idx_tensor; 345 OP_REQUIRES_OK_BOOLEAN(ctx, 346 ctx->input("gradient_indices", &grad_idx_tensor)); 347 const Tensor* grad_val_tensor; 348 OP_REQUIRES_OK_BOOLEAN(ctx, 349 ctx->input("gradient_values", &grad_val_tensor)); 350 const Tensor* grad_shape_tensor = nullptr; 351 if (has_known_shape) { 352 OP_REQUIRES_OK_BOOLEAN(ctx, 353 ctx->input("gradient_shape", &grad_shape_tensor)); 354 } 355 356 // Checks 357 OP_REQUIRES_BOOLEAN( 358 ctx, TensorShapeUtils::IsVector(grad_idx_tensor->shape()), 359 errors::InvalidArgument( 360 "Input indices should be vector but received shape: ", 361 grad_idx_tensor->shape().DebugString())); 362 const int64_t nnz = grad_idx_tensor->dim_size(0); 363 OP_REQUIRES_BOOLEAN( 364 ctx, grad_val_tensor->dims() > 0, 365 errors::InvalidArgument("Values cannot be 0-dimensional.")); 366 OP_REQUIRES_BOOLEAN(ctx, grad_val_tensor->dim_size(0) == nnz, 367 errors::InvalidArgument("Expected ", nnz, 368 " non-empty input values, got ", 369 grad_val_tensor->dim_size(0))); 370 371 *tensor = new std::tuple<const Tensor*, const Tensor*, const Tensor*>( 372 grad_idx_tensor, grad_val_tensor, grad_shape_tensor); 373 374 OP_REQUIRES_OK_BOOLEAN(ctx, this->ValidateShape(*tensor, has_known_shape)); 375 376 return true; 377 } 378 CleanUpGradTensor(std::tuple<const Tensor *,const Tensor *,const Tensor * > * tensor)379 void CleanUpGradTensor(std::tuple<const Tensor*, const Tensor*, 380 const Tensor*>* tensor) override { 381 if (tensor != nullptr) delete tensor; 382 } 383 384 private: cmp(std::vector<int64> * a_idx,const Tensor * b_idx,const int64_t a_row,const int64_t b_row)385 inline int cmp(std::vector<int64>* a_idx, const Tensor* b_idx, 386 const int64_t a_row, const int64_t b_row) { 387 const int64_t a = a_idx->at(a_row); 388 const int64_t b = b_idx->vec<int64>()(b_row); 389 if (a < b) { 390 return -1; 391 } else if (a > b) { 392 return 1; 393 } 394 return 0; 395 } 396 ReturnIdxTensor(OpKernelContext * ctx)397 inline bool ReturnIdxTensor(OpKernelContext* ctx) { 398 Tensor* idx_tensor; 399 const int64_t nnz = accum_idx_vec_->size(); 400 OP_REQUIRES_OK_BOOLEAN(ctx, ctx->allocate_output(0, {nnz}, &idx_tensor)); 401 // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit 402 // the remaining code and just return false 403 auto idx_tensor_vec = idx_tensor->vec<int64>(); 404 for (int i = 0; i < nnz; ++i) { 405 idx_tensor_vec(i) = accum_idx_vec_->at(i); 406 } 407 return true; 408 } 409 ReturnValTensor(OpKernelContext * ctx)410 inline bool ReturnValTensor(OpKernelContext* ctx) { 411 ctx->set_output(1, *accum_val_); 412 return true; 413 } 414 ReturnShapeTensor(OpKernelContext * ctx)415 inline bool ReturnShapeTensor(OpKernelContext* ctx) { 416 int64_t accum_val_dims = accum_val_->dims(); 417 Tensor* shape_tensor; 418 OP_REQUIRES_OK_BOOLEAN( 419 ctx, ctx->allocate_output(2, {accum_val_dims}, &shape_tensor)); 420 // If allocate_output fails, OP_REQUIRES_OK_BOOLEAN will short-circuit 421 // the remaining code and just return false 422 423 // First dim of shape is defined by shape_, others by accum_val_->shape 424 shape_tensor->flat<int64>()(0) = 425 (shape_.dims() > 0) ? shape_.dim_size(0) : -1; 426 for (int64_t i = 1; i < accum_val_dims; i++) { 427 shape_tensor->flat<int64>()(i) = accum_val_->dim_size(i); 428 } 429 return true; 430 } 431 432 TF_DISALLOW_COPY_AND_ASSIGN(SparseConditionalAccumulator); 433 }; 434 435 } // namespace tensorflow 436 437 #endif // TENSORFLOW_CORE_KERNELS_SPARSE_CONDITIONAL_ACCUMULATOR_H_ 438