1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "runtime/device/bucket.h" 18 19 #include "runtime/device/kernel_runtime_manager.h" 20 #include "frontend/parallel/context.h" 21 #include "utils/profile.h" 22 23 namespace mindspore::device { AddGradTensor(const tensor::TensorPtr & tensor)24void Bucket::AddGradTensor(const tensor::TensorPtr &tensor) { 25 if (grad_tensor_list_.size() >= bucket_size_) { 26 MS_LOG(EXCEPTION) << "bucket is full"; 27 } 28 grad_tensor_list_.emplace_back(tensor); 29 if (grad_tensor_list_.size() > bucket_size_) { 30 MS_LOG(EXCEPTION) << "too many tensor add to the bucket, bucket_size_:" << bucket_size_ 31 << " total tensor size:" << grad_tensor_list_.size(); 32 } 33 MS_LOG(INFO) << "current bucket tensors size:" << grad_tensor_list_.size(); 34 // bucket is full, start to launch allreduce 35 if (grad_tensor_list_.size() == bucket_size_) { 36 full_ = true; 37 } 38 } 39 Launch()40void Bucket::Launch() { 41 auto start = GetTime(); 42 if (grad_tensor_list_.size() != bucket_size_) { 43 MS_LOG(EXCEPTION) << "Bucket is not full, grad_tensor_list_ size:" << grad_tensor_list_.size() 44 << " bucket_size_:" << bucket_size_; 45 } 46 MS_LOG(INFO) << "Bucket is full, start to launch AllReduce"; 47 MS_EXCEPTION_IF_NULL(pre_event_); 48 MS_EXCEPTION_IF_NULL(post_event_); 49 AllocateAllReduceAddr(); 50 CopyTensorToContiguousMemory(); 51 pre_event_->RecordEvent(); 52 pre_event_->WaitEvent(); 53 LaunchAllReduce(); 54 // mul fusion 55 CalculateMean(); 56 post_event_->RecordEvent(); 57 UpdateTensorAddr(); 58 // pass event to the tensor 59 for (auto &tensor : grad_tensor_list_) { 60 MS_EXCEPTION_IF_NULL(tensor); 61 tensor->SetDeviceEvent(post_event_); 62 } 63 MS_LOG(INFO) << "Bucket launch cost:" << (GetTime() - start) * 1e6 << " us"; 64 } 65 UpdateTensorAddr()66void Bucket::UpdateTensorAddr() { 67 if (grad_tensor_list_.size() != bucket_size_ || new_tensor_output_addrs_.size() != bucket_size_) { 68 MS_LOG(EXCEPTION) << "grad_tensor_list size:" << grad_tensor_list_.size() 69 << " tensor output addr size:" << new_tensor_output_addrs_.size() 70 << " bucket size:" << bucket_size_; 71 } 72 73 for (size_t i = 0; i < bucket_size_; ++i) { 74 auto &tensor = grad_tensor_list_[i]; 75 MS_EXCEPTION_IF_NULL(tensor); 76 auto device_address = std::dynamic_pointer_cast<DeviceAddress>(tensor->device_address()); 77 // release old addr and manage addr by this Bucket. 78 MS_EXCEPTION_IF_NULL(device_address); 79 auto origin_dev_ptr = device_address->GetMutablePtr(); 80 tensor_old_addr_list_.emplace_back(origin_dev_ptr); 81 device_address->from_mem_pool_ = false; 82 device_address->set_ptr(new_tensor_output_addrs_[i]); 83 } 84 } 85 CalculateMean()86void Bucket::CalculateMean() { 87 auto parallel_context = parallel::ParallelContext::GetInstance(); 88 MS_EXCEPTION_IF_NULL(parallel_context); 89 auto grad_mean = parallel_context->gradients_mean(); 90 if (!grad_mean) { 91 UpdateTensorOutputAddr(ar_output_addr_); 92 return; 93 } 94 if (launch_mul_ == nullptr) { 95 launch_mul_ = CreateLaunchMul(); 96 MS_EXCEPTION_IF_NULL(launch_mul_); 97 } 98 // set mul input1 addr 99 launch_mul_->SetInputAddr(ar_output_addr_); 100 // launch mean 101 launch_mul_->LaunchOpKernel(); 102 // store tensor output addr 103 auto launch_output = launch_mul_->GetKernelOutputAddr(); 104 if (launch_output.size() != 1) { 105 MS_LOG(EXCEPTION) << "launch mul outputs should have one output"; 106 } 107 UpdateTensorOutputAddr(launch_output[0]); 108 } 109 UpdateTensorOutputAddr(uint8_t * addr)110void Bucket::UpdateTensorOutputAddr(uint8_t *addr) { 111 uint8_t *tensor_output = addr; 112 for (size_t i = 0; i < bucket_size_; ++i) { 113 (void)new_tensor_output_addrs_.emplace_back(tensor_output); 114 tensor_output += align_size_list_[i]; 115 } 116 } 117 LazyDeleteOldAddr()118void Bucket::LazyDeleteOldAddr() { 119 MS_LOG(INFO) << "Lazy delete old grad address"; 120 for (auto old_addr : tensor_old_addr_list_) { 121 FreeDeviceMem(old_addr); 122 } 123 tensor_old_addr_list_.clear(); 124 } 125 Release()126void Bucket::Release() { 127 MS_LOG(INFO) << "Clear bucket:" << id_; 128 grad_tensor_list_.clear(); 129 align_size_list_.clear(); 130 new_tensor_output_addrs_.clear(); 131 memcpy_input_addrs_.clear(); 132 memcpy_output_addrs_.clear(); 133 tensor_type_list_.clear(); 134 LazyDeleteOldAddr(); 135 FreeAllDeviceMem(); 136 full_ = false; 137 } 138 } // namespace mindspore::device 139