• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "runtime/device/ascend/ascend_bucket.h"
18 
19 #include "runtime/mem.h"
20 #include "external/hccl/hccl.h"
21 #include "runtime/device/ascend/ascend_memory_pool.h"
22 #include "backend/kernel_compiler/hccl/hcom_util.h"
23 #include "runtime/device/memory_manager.h"
24 #include "runtime/device/kernel_runtime_manager.h"
25 #include "runtime/device/ascend/ascend_event.h"
26 #include "runtime/device/ascend/ascend_launch_mul.h"
27 #include "runtime/device/ascend/ascend_launch_atomic_clean.h"
28 #include "runtime/hccl_adapter/hccl_adapter.h"
29 #include "utils/profile.h"
30 
31 namespace mindspore::device::ascend {
AllocateAllReduceAddr()32 void AscendBucket::AllocateAllReduceAddr() {
33   // Check bucket is full
34   if (grad_tensor_list_.size() != bucket_size_) {
35     MS_LOG(EXCEPTION) << "grad tensor list size:" << grad_tensor_list_.size()
36                       << " is not equal to bucket size:" << bucket_size_;
37   }
38 
39   size_t total_size = 0;
40   std::vector<size_t> origin_size_list;
41   for (auto &tensor : grad_tensor_list_) {
42     MS_EXCEPTION_IF_NULL(tensor);
43     tensor_type_list_.emplace_back(tensor->data_type());
44     DeviceAddressPtr device_address = std::dynamic_pointer_cast<DeviceAddress>(tensor->device_address());
45     MS_EXCEPTION_IF_NULL(device_address);
46     auto origin_size = device_address->GetSize();
47     auto align_size = MemoryManager::GetCommonAlignSize(origin_size);
48     origin_size_list.emplace_back(origin_size);
49     (void)align_size_list_.emplace_back(align_size);
50     total_size += align_size;
51     memcpy_input_addrs_.emplace_back(std::make_shared<kernel::Address>(
52       static_cast<uint8_t *>(device_address->GetMutablePtr()), device_address->GetSize()));
53   }
54 
55   total_size_ = total_size;
56 
57   auto runtime_instance = device::KernelRuntimeManager::Instance().GetCurrentKernelRuntime();
58   MS_EXCEPTION_IF_NULL(runtime_instance);
59   // AllReduce input output addr need to clear zero
60   ar_input_addr_ = runtime_instance->MallocCommunicationMemFromMemPool(total_size);
61   ar_output_addr_ = runtime_instance->MallocCommunicationMemFromMemPool(total_size);
62 
63   // generate memecpy output addr
64   uint8_t *memcpy_output = ar_input_addr_;
65   if (origin_size_list.size() < bucket_size_ || align_size_list_.size() < bucket_size_) {
66     MS_LOG(EXCEPTION) << "Invalid bucket_size_:" << bucket_size_ << " origin_size_list.size:" << origin_size_list.size()
67                       << " align_size_list.size:" << align_size_list_.size();
68   }
69   for (size_t i = 0; i < bucket_size_; ++i) {
70     memcpy_output_addrs_.emplace_back(std::make_shared<kernel::Address>(memcpy_output, origin_size_list[i]));
71     memcpy_output += align_size_list_[i];
72   }
73 }
74 
FreeDeviceMem(void * dev_ptr)75 void AscendBucket::FreeDeviceMem(void *dev_ptr) { AscendMemoryPool::GetInstance().FreeTensorMem(dev_ptr); }
76 
FreeAllDeviceMem()77 void AscendBucket::FreeAllDeviceMem() {
78   if (ar_input_addr_ != nullptr) {
79     uint8_t *origin_dev_addr = ar_input_addr_ - kMemAlignSize;
80     FreeDeviceMem(origin_dev_addr);
81     ar_input_addr_ = nullptr;
82   }
83   if (ar_output_addr_ != nullptr) {
84     uint8_t *origin_dev_addr = ar_output_addr_ - kMemAlignSize;
85     FreeDeviceMem(origin_dev_addr);
86     ar_output_addr_ = nullptr;
87   }
88   // clear launch mul device Memory
89   if (launch_mul_ != nullptr) {
90     launch_mul_->FreeLaunchDeviceMem();
91   }
92   // clear launch atomic clean device Memory
93   if (launch_atomic_clean_ != nullptr) {
94     launch_atomic_clean_->FreeLaunchDeviceMem();
95   }
96 }
97 
CopyTensorToContiguousMemory()98 void AscendBucket::CopyTensorToContiguousMemory() {
99   // clear allreduce input addr
100   CleanAllReduceInputAddr();
101   if (memcpy_input_addrs_.size() < bucket_size_ || memcpy_output_addrs_.size() < bucket_size_) {
102     MS_LOG(EXCEPTION) << "Invalid bucket_size_:" << bucket_size_
103                       << " memcpy_input_addr_.size:" << memcpy_input_addrs_.size()
104                       << " memcpy_output_addr_.size:" << memcpy_output_addrs_.size();
105   }
106   for (size_t i = 0; i < bucket_size_; ++i) {
107     MS_EXCEPTION_IF_NULL(memcpy_input_addrs_[i]);
108     MS_EXCEPTION_IF_NULL(memcpy_output_addrs_[i]);
109     MS_LOG(DEBUG) << "MemcpyAsync dst size:" << memcpy_output_addrs_[i]->size
110                   << " src size:" << memcpy_input_addrs_[i]->size;
111     if (memcpy_output_addrs_[i]->size < memcpy_input_addrs_[i]->size) {
112       MS_LOG(EXCEPTION) << "rtMemcpyAsync dst size < src size";
113     }
114 
115     auto ret = rtMemcpyAsync(memcpy_output_addrs_[i]->addr, memcpy_output_addrs_[i]->size, memcpy_input_addrs_[i]->addr,
116                              memcpy_input_addrs_[i]->size, RT_MEMCPY_DEVICE_TO_DEVICE, compute_stream_);
117     if (ret != RT_ERROR_NONE) {
118       MS_LOG(EXCEPTION) << "Call rtMemcpyAsync failed, error code:" << ret;
119     }
120   }
121 }
122 
LaunchAllReduce()123 void AscendBucket::LaunchAllReduce() {
124   if (tensor_type_list_.empty()) {
125     MS_LOG(EXCEPTION) << "No tensor type found";
126   }
127 
128   // AllReduce inputs data type should be same
129   auto type = tensor_type_list_[0];
130   if (std::any_of(tensor_type_list_.begin(), tensor_type_list_.end(),
131                   [&type](TypeId tensor_type) { return type != tensor_type; })) {
132     MS_LOG(EXCEPTION) << "AllReduce input have different dtype";
133   }
134 
135   auto iter = kConstOpHcomDataTypeMap.find(type);
136   if (iter == kConstOpHcomDataTypeMap.end()) {
137     MS_LOG(EXCEPTION) << "Unknown data type:" << type;
138   }
139 
140   uint32_t type_size;
141   if (!HcomUtil::GetHcomTypeSize(iter->second, &type_size)) {
142     MS_LOG(EXCEPTION) << "Get hcom type size failed";
143   }
144 
145   if (type_size == 0 || total_size_ % type_size != 0) {
146     MS_LOG(EXCEPTION) << "Total_size[" << total_size_ << "],Type_size[" << type_size << "] != 0, fail!";
147   }
148   auto hccl_count = total_size_ / type_size;
149 
150   HcclReduceOp op_type = HcclReduceOp::HCCL_REDUCE_SUM;
151   auto hccl_result = hccl::HcclAdapter::GetInstance().HcclAllReduce(ar_input_addr_, ar_output_addr_, hccl_count,
152                                                                     iter->second, op_type, stream_);
153   if (hccl_result != HCCL_SUCCESS) {
154     MS_LOG(EXCEPTION) << "HCCL AllReduce failed, ret:" << hccl_result;
155   }
156 }
157 
CleanAllReduceInputAddr()158 void AscendBucket::CleanAllReduceInputAddr() {
159   if (launch_atomic_clean_ == nullptr) {
160     launch_atomic_clean_ = CreateLaunchAtomicClean();
161     MS_EXCEPTION_IF_NULL(launch_atomic_clean_);
162   }
163   // set atomic clean input addr
164   launch_atomic_clean_->SetInputAddr(ar_input_addr_);
165   // launch atomic clean
166   launch_atomic_clean_->LaunchOpKernel();
167 }
168 
CreateLaunchMul()169 std::shared_ptr<LaunchKernel> AscendBucket::CreateLaunchMul() {
170   if (tensor_type_list_.empty()) {
171     MS_LOG(ERROR) << "tensor_type_list_ is empty";
172   }
173   auto launch_mul = std::make_shared<AscendLaunchMul>(stream_, tensor_type_list_[0], total_size_);
174   MS_EXCEPTION_IF_NULL(launch_mul);
175   return launch_mul;
176 }
177 
CreateLaunchAtomicClean()178 std::shared_ptr<LaunchKernel> AscendBucket::CreateLaunchAtomicClean() {
179   if (tensor_type_list_.empty()) {
180     MS_LOG(ERROR) << "tensor_type_list_ is empty";
181   }
182   auto launch_atomic_clean =
183     std::make_shared<AscendLaunchAtomicClean>(compute_stream_, tensor_type_list_[0], total_size_);
184   MS_EXCEPTION_IF_NULL(launch_atomic_clean);
185   return launch_atomic_clean;
186 }
187 
Init(const std::vector<void * > & compute_streams,const std::vector<void * > & communication_streams)188 void AscendBucket::Init(const std::vector<void *> &compute_streams, const std::vector<void *> &communication_streams) {
189   pre_event_ = std::make_shared<AscendEvent>();
190   post_event_ = std::make_shared<AscendEvent>();
191 
192   if (!compute_streams.empty()) {
193     compute_stream_ = compute_streams.front();
194   }
195   if (!communication_streams.empty()) {
196     stream_ = communication_streams.front();
197   }
198   MS_EXCEPTION_IF_NULL(compute_stream_);
199   MS_EXCEPTION_IF_NULL(stream_);
200 
201   MS_EXCEPTION_IF_NULL(pre_event_);
202   MS_EXCEPTION_IF_NULL(post_event_);
203   pre_event_->set_wait_stream(stream_);
204   pre_event_->set_record_stream(compute_stream_);
205   post_event_->set_wait_stream(compute_stream_);
206   post_event_->set_record_stream(stream_);
207 }
208 }  // namespace mindspore::device::ascend
209