• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "runtime/device/bucket.h"
18 
19 #include "runtime/device/kernel_runtime_manager.h"
20 #include "frontend/parallel/context.h"
21 #include "utils/profile.h"
22 
23 namespace mindspore::device {
AddGradTensor(const tensor::TensorPtr & tensor)24 void Bucket::AddGradTensor(const tensor::TensorPtr &tensor) {
25   if (grad_tensor_list_.size() >= bucket_size_) {
26     MS_LOG(EXCEPTION) << "bucket is full";
27   }
28   grad_tensor_list_.emplace_back(tensor);
29   if (grad_tensor_list_.size() > bucket_size_) {
30     MS_LOG(EXCEPTION) << "too many tensor add to the bucket, bucket_size_:" << bucket_size_
31                       << " total tensor size:" << grad_tensor_list_.size();
32   }
33   MS_LOG(INFO) << "current bucket tensors size:" << grad_tensor_list_.size();
34   // bucket is full, start to launch allreduce
35   if (grad_tensor_list_.size() == bucket_size_) {
36     full_ = true;
37   }
38 }
39 
Launch()40 void Bucket::Launch() {
41   auto start = GetTime();
42   if (grad_tensor_list_.size() != bucket_size_) {
43     MS_LOG(EXCEPTION) << "Bucket is not full, grad_tensor_list_ size:" << grad_tensor_list_.size()
44                       << " bucket_size_:" << bucket_size_;
45   }
46   MS_LOG(INFO) << "Bucket is full, start to launch AllReduce";
47   MS_EXCEPTION_IF_NULL(pre_event_);
48   MS_EXCEPTION_IF_NULL(post_event_);
49   AllocateAllReduceAddr();
50   CopyTensorToContiguousMemory();
51   pre_event_->RecordEvent();
52   pre_event_->WaitEvent();
53   LaunchAllReduce();
54   // mul fusion
55   CalculateMean();
56   post_event_->RecordEvent();
57   UpdateTensorAddr();
58   // pass event to the tensor
59   for (auto &tensor : grad_tensor_list_) {
60     MS_EXCEPTION_IF_NULL(tensor);
61     tensor->SetDeviceEvent(post_event_);
62   }
63   MS_LOG(INFO) << "Bucket launch cost:" << (GetTime() - start) * 1e6 << " us";
64 }
65 
UpdateTensorAddr()66 void Bucket::UpdateTensorAddr() {
67   if (grad_tensor_list_.size() != bucket_size_ || new_tensor_output_addrs_.size() != bucket_size_) {
68     MS_LOG(EXCEPTION) << "grad_tensor_list size:" << grad_tensor_list_.size()
69                       << " tensor output addr size:" << new_tensor_output_addrs_.size()
70                       << " bucket size:" << bucket_size_;
71   }
72 
73   for (size_t i = 0; i < bucket_size_; ++i) {
74     auto &tensor = grad_tensor_list_[i];
75     MS_EXCEPTION_IF_NULL(tensor);
76     auto device_address = std::dynamic_pointer_cast<DeviceAddress>(tensor->device_address());
77     // release old addr and manage addr by this Bucket.
78     MS_EXCEPTION_IF_NULL(device_address);
79     auto origin_dev_ptr = device_address->GetMutablePtr();
80     tensor_old_addr_list_.emplace_back(origin_dev_ptr);
81     device_address->from_mem_pool_ = false;
82     device_address->set_ptr(new_tensor_output_addrs_[i]);
83   }
84 }
85 
CalculateMean()86 void Bucket::CalculateMean() {
87   auto parallel_context = parallel::ParallelContext::GetInstance();
88   MS_EXCEPTION_IF_NULL(parallel_context);
89   auto grad_mean = parallel_context->gradients_mean();
90   if (!grad_mean) {
91     UpdateTensorOutputAddr(ar_output_addr_);
92     return;
93   }
94   if (launch_mul_ == nullptr) {
95     launch_mul_ = CreateLaunchMul();
96     MS_EXCEPTION_IF_NULL(launch_mul_);
97   }
98   // set mul input1 addr
99   launch_mul_->SetInputAddr(ar_output_addr_);
100   // launch mean
101   launch_mul_->LaunchOpKernel();
102   // store tensor output addr
103   auto launch_output = launch_mul_->GetKernelOutputAddr();
104   if (launch_output.size() != 1) {
105     MS_LOG(EXCEPTION) << "launch mul outputs should have one output";
106   }
107   UpdateTensorOutputAddr(launch_output[0]);
108 }
109 
UpdateTensorOutputAddr(uint8_t * addr)110 void Bucket::UpdateTensorOutputAddr(uint8_t *addr) {
111   uint8_t *tensor_output = addr;
112   for (size_t i = 0; i < bucket_size_; ++i) {
113     (void)new_tensor_output_addrs_.emplace_back(tensor_output);
114     tensor_output += align_size_list_[i];
115   }
116 }
117 
LazyDeleteOldAddr()118 void Bucket::LazyDeleteOldAddr() {
119   MS_LOG(INFO) << "Lazy delete old grad address";
120   for (auto old_addr : tensor_old_addr_list_) {
121     FreeDeviceMem(old_addr);
122   }
123   tensor_old_addr_list_.clear();
124 }
125 
Release()126 void Bucket::Release() {
127   MS_LOG(INFO) << "Clear bucket:" << id_;
128   grad_tensor_list_.clear();
129   align_size_list_.clear();
130   new_tensor_output_addrs_.clear();
131   memcpy_input_addrs_.clear();
132   memcpy_output_addrs_.clear();
133   tensor_type_list_.clear();
134   LazyDeleteOldAddr();
135   FreeAllDeviceMem();
136   full_ = false;
137 }
138 }  // namespace mindspore::device
139