• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "runtime/device/gpu/gpu_bucket.h"
18 
19 #include <cuda_runtime_api.h>
20 #include <nccl.h>
21 #include <vector>
22 #include <memory>
23 #include "abstract/utils.h"
24 #include "runtime/device/gpu/gpu_event.h"
25 #include "runtime/device/gpu/gpu_memory_allocator.h"
26 #include "runtime/device/gpu/gpu_device_manager.h"
27 #include "runtime/device/kernel_runtime_manager.h"
28 #include "runtime/device/gpu/distribution/collective_init.h"
29 #include "runtime/device/gpu/gpu_launch_mul.h"
30 #include "backend/kernel_compiler/gpu/nccl/nccl_gpu_kernel.h"
31 #include "runtime/device/gpu/gpu_common.h"
32 
33 namespace {
34 const size_t kCommunicationMemAlignSize = 16;
AlignMemorySize(size_t size)35 size_t AlignMemorySize(size_t size) {
36   if (size == 0) {
37     return kCommunicationMemAlignSize;
38   }
39   return ((size + kCommunicationMemAlignSize - 1) / kCommunicationMemAlignSize) * kCommunicationMemAlignSize;
40 }
41 }  // namespace
42 namespace mindspore::device::gpu {
GPUBucket(uint32_t id,uint32_t bucket_size)43 GPUBucket::GPUBucket(uint32_t id, uint32_t bucket_size) : Bucket(id, bucket_size), collective_handle_(nullptr) {
44   group_ = kNcclWorldGroup;
45 }
46 
AllocateAllReduceAddr()47 void GPUBucket::AllocateAllReduceAddr() {
48   MS_LOG(INFO) << "start";
49   if (grad_tensor_list_.size() != bucket_size_) {
50     MS_LOG(EXCEPTION) << "grad tensor list size:" << grad_tensor_list_.size()
51                       << " is not equal to bucket size:" << bucket_size_;
52   }
53 
54   auto total_size = 0;
55   std::vector<size_t> size_list;
56   for (auto &tensor : grad_tensor_list_) {
57     MS_EXCEPTION_IF_NULL(tensor);
58     tensor_type_list_.emplace_back(tensor->data_type());
59     DeviceAddressPtr device_address = std::dynamic_pointer_cast<DeviceAddress>(tensor->device_address());
60     MS_EXCEPTION_IF_NULL(device_address);
61     auto origin_size = device_address->GetSize();
62     auto align_size = AlignMemorySize(origin_size);
63     size_list.emplace_back(origin_size);
64     align_size_list_.emplace_back(align_size);
65     total_size += align_size;
66     memcpy_input_addrs_.emplace_back(
67       std::make_shared<kernel::Address>(static_cast<uint8_t *>(device_address->GetMutablePtr()), origin_size));
68   }
69   total_size_ = total_size;
70 
71   ar_input_addr_ = static_cast<uint8_t *>(GPUMemoryAllocator::GetInstance().AllocTensorMem(total_size));
72   ar_output_addr_ = static_cast<uint8_t *>(GPUMemoryAllocator::GetInstance().AllocTensorMem(total_size));
73 
74   uint8_t *memcpy_output = ar_input_addr_;
75   for (size_t i = 0; i < bucket_size_; ++i) {
76     memcpy_output_addrs_.emplace_back(std::make_shared<kernel::Address>(memcpy_output, size_list[i]));
77     memcpy_output += align_size_list_[i];
78   }
79   MS_LOG(INFO) << "end";
80 }
81 
FreeDeviceMem(void * dev_ptr)82 void GPUBucket::FreeDeviceMem(void *dev_ptr) { GPUMemoryAllocator::GetInstance().FreeTensorMem(dev_ptr); }
83 
FreeAllDeviceMem()84 void GPUBucket::FreeAllDeviceMem() {
85   MS_LOG(INFO) << "start";
86   if (ar_input_addr_ != nullptr) {
87     FreeDeviceMem(ar_input_addr_);
88     ar_input_addr_ = nullptr;
89   }
90   if (ar_output_addr_ != nullptr) {
91     FreeDeviceMem(ar_output_addr_);
92     ar_output_addr_ = nullptr;
93   }
94   // clear launch mul device memory
95   if (launch_mul_ != nullptr) {
96     launch_mul_->FreeLaunchDeviceMem();
97   }
98   MS_LOG(INFO) << "end";
99 }
100 
CopyTensorToContiguousMemory()101 void GPUBucket::CopyTensorToContiguousMemory() {
102   MS_LOG(INFO) << "start";
103   MS_EXCEPTION_IF_NULL(compute_stream_);
104   // Clean allreduce input
105   CHECK_CUDA_RET_WITH_EXCEPT_NOTRACE(
106     cudaMemsetAsync(ar_input_addr_, 0, total_size_, static_cast<cudaStream_t>(compute_stream_)),
107     "Call cudaMemsetAsync failed");
108 
109   for (size_t i = 0; i < bucket_size_; ++i) {
110     MS_EXCEPTION_IF_NULL(memcpy_output_addrs_[i]);
111     MS_EXCEPTION_IF_NULL(memcpy_input_addrs_[i]);
112     if (!GPUDeviceManager::GetInstance().CopyDeviceMemToDeviceAsync(memcpy_output_addrs_[i]->addr,
113                                                                     memcpy_input_addrs_[i]->addr,
114                                                                     memcpy_output_addrs_[i]->size, compute_stream_)) {
115       MS_LOG(EXCEPTION) << "Copy memory failed";
116     }
117   }
118   MS_LOG(INFO) << "end";
119 }
120 
LaunchAllReduce()121 void GPUBucket::LaunchAllReduce() {
122   MS_LOG(INFO) << "start";
123   collective_handle_ = device::gpu::CollectiveInitializer::instance().collective_handle();
124   auto all_reduce_funcptr =
125     reinterpret_cast<kernel::AllReduce>(dlsym(const_cast<void *>(collective_handle_), "AllReduce"));
126   MS_EXCEPTION_IF_NULL(all_reduce_funcptr);
127   MS_EXCEPTION_IF_NULL(stream_);
128 
129   if (tensor_type_list_.empty()) {
130     MS_LOG(EXCEPTION) << "No tesnor type found";
131   }
132   auto type = tensor_type_list_[0];
133   if (std::any_of(tensor_type_list_.begin(), tensor_type_list_.end(),
134                   [&type](TypeId tensor_type) { return type != tensor_type; })) {
135     MS_LOG(EXCEPTION) << "AllReduce input have different dtype";
136   }
137 
138   auto type_size = abstract::TypeIdSize(type);
139   if (type_size == 0) {
140     MS_LOG(EXCEPTION) << "Invalid type:" << type;
141   }
142 
143   // typeid to nccl_data_type
144   auto nccl_data_type_iter = kernel::kNcclDtypeMap.find(TypeIdLabel(type));
145   if (nccl_data_type_iter == kernel::kNcclDtypeMap.end()) {
146     MS_LOG(EXCEPTION) << "Invalid type:" << type;
147   }
148 
149   auto nccl_result =
150     (*all_reduce_funcptr)(ar_input_addr_, ar_output_addr_, total_size_ / type_size, nccl_data_type_iter->second,
151                           ncclRedOp_t::ncclSum, static_cast<cudaStream_t>(stream_), group_);
152   if (nccl_result != ncclSuccess) {
153     MS_LOG(EXCEPTION) << "AllReduce failed, ret:" << nccl_result;
154   }
155 
156   MS_LOG(INFO) << "end";
157 }
158 
CreateLaunchMul()159 std::shared_ptr<LaunchKernel> GPUBucket::CreateLaunchMul() {
160   if (tensor_type_list_.empty()) {
161     MS_LOG(ERROR) << "tensor_type_list_ is empty";
162   }
163   auto launch_mul = std::make_shared<GPULaunchMul>(stream_, tensor_type_list_[0], total_size_);
164   MS_EXCEPTION_IF_NULL(launch_mul);
165   return launch_mul;
166 }
167 
Init(const std::vector<void * > & compute_streams,const std::vector<void * > & communication_streams)168 void GPUBucket::Init(const std::vector<void *> &compute_streams, const std::vector<void *> &communication_streams) {
169   pre_event_ = std::make_shared<GpuEvent>();
170   post_event_ = std::make_shared<GpuEvent>();
171 
172   if (!compute_streams.empty()) {
173     compute_stream_ = compute_streams.front();
174   }
175   if (!communication_streams.empty()) {
176     stream_ = communication_streams.front();
177   }
178   MS_EXCEPTION_IF_NULL(compute_stream_);
179   MS_EXCEPTION_IF_NULL(stream_);
180 
181   MS_EXCEPTION_IF_NULL(pre_event_);
182   MS_EXCEPTION_IF_NULL(post_event_);
183   pre_event_->set_record_stream(compute_stream_);
184   pre_event_->set_wait_stream(stream_);
185   post_event_->set_record_stream(stream_);
186   post_event_->set_wait_stream(compute_stream_);
187 }
188 }  // namespace mindspore::device::gpu
189