1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "runtime/device/gpu/gpu_buffer_mgr.h"
18 #include <cuda_runtime_api.h>
19 #include <utility>
20 #include "utils/log_adapter.h"
21 #include "utils/ms_utils.h"
22 #include "pybind11/pybind11.h"
23 #include "pybind11/stl.h"
24
25 namespace py = pybind11;
26
27 namespace mindspore {
28 namespace device {
AllocHandle()29 unsigned int HandleMgr::AllocHandle() {
30 for (size_t i = 0; i < MAX_HANDLE_NUM; ++i) {
31 if (!handle_list_[i]) {
32 handle_list_[i] = true;
33 return (unsigned int)i;
34 }
35 }
36 return INVALID_HANDLE;
37 }
38
FreeHandle(unsigned int handle_id)39 void HandleMgr::FreeHandle(unsigned int handle_id) {
40 if (handle_id >= MAX_HANDLE_NUM) {
41 return;
42 }
43 handle_list_[handle_id] = false;
44 }
45
GetInstance()46 GpuBufferMgr &GpuBufferMgr::GetInstance() noexcept {
47 static GpuBufferMgr instance;
48 return instance;
49 }
50
Create(unsigned int device_id,const std::string & channel_name,void * addr,const std::vector<size_t> & shape,const size_t & capacity)51 BlockQueueStatus_T GpuBufferMgr::Create(unsigned int device_id, const std::string &channel_name, void *addr,
52 const std::vector<size_t> &shape, const size_t &capacity) {
53 std::string name = std::to_string(device_id) + std::string("_") + channel_name;
54 if (name_queue_map_.count(name)) {
55 MS_LOG(ERROR) << "Queue already exist: " << name;
56 return QUEUE_EXIST;
57 }
58 std::shared_ptr<BlockingQueue> queue = std::make_shared<BlockingQueue>();
59 BlockQueueStatus_T rt = queue->Create(addr, shape, capacity);
60 if (rt != SUCCESS) {
61 return rt;
62 }
63 (void)name_queue_map_.insert(std::make_pair(name, queue));
64 init_ = true;
65 return SUCCESS;
66 }
67
Open(unsigned int device_id,const std::string & channel_name,const std::vector<size_t> & shape,const std::function<void (void *,int32_t)> func)68 unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name,
69 const std::vector<size_t> &shape, const std::function<void(void *, int32_t)> func) {
70 set_device();
71 std::string name = std::to_string(device_id) + std::string("_") + channel_name;
72 if (!name_queue_map_.count(name)) {
73 MS_LOG(ERROR) << "Queue not exist " << name;
74 return HandleMgr::INVALID_HANDLE;
75 }
76 unsigned int handle = handle_mgr_.AllocHandle();
77 if (handle == HandleMgr::INVALID_HANDLE) {
78 MS_LOG(ERROR) << "handle is invalid";
79 return HandleMgr::INVALID_HANDLE;
80 }
81 (void)handle_queue_map_.insert(std::make_pair(handle, name_queue_map_[name]));
82 name_queue_map_[name]->RegisterRelease(func);
83 open_by_dataset_++;
84 return handle;
85 }
86
Open(unsigned int device_id,const std::string & channel_name,const std::vector<size_t> & shape)87 unsigned int GpuBufferMgr::Open(unsigned int device_id, const std::string &channel_name,
88 const std::vector<size_t> &shape) {
89 set_device();
90 std::string name = std::to_string(device_id) + std::string("_") + channel_name;
91 if (!name_queue_map_.count(name)) {
92 MS_LOG(ERROR) << "Queue not exist " << name;
93 return HandleMgr::INVALID_HANDLE;
94 }
95 unsigned int handle = handle_mgr_.AllocHandle();
96 if (handle == HandleMgr::INVALID_HANDLE) {
97 MS_LOG(ERROR) << "handle is invalid";
98 return HandleMgr::INVALID_HANDLE;
99 }
100 (void)handle_queue_map_.insert(std::make_pair(handle, name_queue_map_[name]));
101 return handle;
102 }
103
set_device_id(int device_id)104 void GpuBufferMgr::set_device_id(int device_id) { cur_dev_id_ = device_id; }
105
set_device() const106 void GpuBufferMgr::set_device() const {
107 auto ret = cudaSetDevice(cur_dev_id_);
108 if (ret != cudaSuccess) {
109 MS_LOG(ERROR)
110 << "Set device for id:" << cur_dev_id_ << " failed, ret[" << static_cast<int>(ret) << "], "
111 << cudaGetErrorString(ret)
112 << ". Please make sure that the 'device_id' set in context is in the range:[0, total number of GPU). "
113 "If the environment variable 'CUDA_VISIBLE_DEVICES' is set, the total number of GPU will be the number set "
114 "in the environment variable 'CUDA_VISIBLE_DEVICES'. For example, if export CUDA_VISIBLE_DEVICES=4,5,6, the "
115 "'device_id' can be 0,1,2 at the moment, 'device_id' starts from 0, and 'device_id'=0 means using GPU of "
116 "number 4.";
117 }
118 }
119
Push(unsigned int handle,const std::vector<DataItemGpu> & data,unsigned int timeout_in_sec)120 BlockQueueStatus_T GpuBufferMgr::Push(unsigned int handle, const std::vector<DataItemGpu> &data,
121 unsigned int timeout_in_sec) {
122 auto iter = handle_queue_map_.find(handle);
123 if (iter == handle_queue_map_.end()) {
124 return HANDLE_NOT_EXIST;
125 }
126 return iter->second->Push(data, timeout_in_sec);
127 }
128
Front(unsigned int handle,void ** addr,size_t * len)129 BlockQueueStatus_T GpuBufferMgr::Front(unsigned int handle, void **addr, size_t *len) {
130 auto iter = handle_queue_map_.find(handle);
131 if (iter == handle_queue_map_.end()) {
132 return HANDLE_NOT_EXIST;
133 }
134 return iter->second->Front(addr, len);
135 }
136
Pop(unsigned int handle)137 BlockQueueStatus_T GpuBufferMgr::Pop(unsigned int handle) {
138 auto iter = handle_queue_map_.find(handle);
139 if (iter == handle_queue_map_.end()) {
140 return HANDLE_NOT_EXIST;
141 }
142 return iter->second->Pop();
143 }
144
Close(unsigned int handle)145 void GpuBufferMgr::Close(unsigned int handle) noexcept {
146 if (!handle_queue_map_.count(handle)) {
147 return;
148 }
149 (void)handle_queue_map_.erase(handle);
150 handle_mgr_.FreeHandle(handle);
151 return;
152 }
153
IsInit() const154 bool GpuBufferMgr::IsInit() const { return init_; }
155
IsClosed() const156 bool GpuBufferMgr::IsClosed() const { return closed_; }
157
Destroy()158 bool GpuBufferMgr::Destroy() {
159 for (auto iter = name_queue_map_.begin(); iter != name_queue_map_.end(); ++iter) {
160 std::shared_ptr<BlockingQueue> queue = iter->second;
161 if (queue != nullptr) {
162 if (!queue->Destroy()) {
163 return false;
164 }
165 queue.reset();
166 }
167 }
168 name_queue_map_.clear();
169 return true;
170 }
171
isCreated(unsigned int device_id,const std::string & channel_name)172 inline bool GpuBufferMgr::isCreated(unsigned int device_id, const std::string &channel_name) {
173 std::string name = std::to_string(device_id) + std::string("_") + channel_name;
174 if (name_queue_map_.count(name) != 0) {
175 return true;
176 }
177 return false;
178 }
179
CloseNotify()180 bool GpuBufferMgr::CloseNotify() {
181 py::gil_scoped_release release;
182 bool result = true;
183 // lock scope
184 {
185 std::lock_guard<std::mutex> lk(close_mutex_);
186 // set closed_ to be true, all the dataset retry can be jumped out of the while
187 closed_ = true;
188 }
189
190 // wati for the dataset threads' ack
191 for (int i = 0; i < open_by_dataset_; i++) {
192 if (sema.Wait() == false) {
193 MS_LOG(ERROR) << "time out of receiving signals";
194 result = false;
195 }
196 MS_LOG(DEBUG) << "receive one signal (" << i + 1 << "/" << open_by_dataset_ << ")";
197 }
198 return result;
199 }
200
CloseConfirm()201 void GpuBufferMgr::CloseConfirm() { sema.Signal(); }
202
Size(unsigned int handle)203 size_t GpuBufferMgr::Size(unsigned int handle) {
204 if (handle == HandleMgr::INVALID_HANDLE) {
205 MS_LOG(ERROR) << "handle is invalid";
206 return 0;
207 }
208 if (handle_queue_map_.count(handle) == 0) {
209 MS_LOG(ERROR) << "Handle not exist " << handle;
210 return 0;
211 }
212 return handle_queue_map_.at(handle)->Size();
213 }
214
Size(unsigned int device_id,const std::string & channel_name)215 size_t GpuBufferMgr::Size(unsigned int device_id, const std::string &channel_name) {
216 std::string name = std::to_string(device_id) + std::string("_") + channel_name;
217 if (!name_queue_map_.count(name)) {
218 MS_LOG(ERROR) << "Queue not exist " << name;
219 return 0;
220 }
221 return name_queue_map_.at(name)->Size();
222 }
223
Capacity(unsigned int handle)224 size_t GpuBufferMgr::Capacity(unsigned int handle) {
225 if (handle == HandleMgr::INVALID_HANDLE) {
226 MS_LOG(ERROR) << "handle is invalid";
227 return 0;
228 }
229 if (handle_queue_map_.count(handle) == 0) {
230 MS_LOG(ERROR) << "Handle not exist " << handle;
231 return 0;
232 }
233 return handle_queue_map_.at(handle)->Capacity();
234 }
235
Capacity(unsigned int device_id,const std::string & channel_name)236 size_t GpuBufferMgr::Capacity(unsigned int device_id, const std::string &channel_name) {
237 std::string name = std::to_string(device_id) + std::string("_") + channel_name;
238 if (!name_queue_map_.count(name)) {
239 MS_LOG(ERROR) << "Queue not exist " << name;
240 return 0;
241 }
242 return name_queue_map_.at(name)->Capacity();
243 }
244 } // namespace device
245 } // namespace mindspore
246