1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "runtime/device/gpu/blocking_queue.h"
18 #include <chrono>
19 #include "runtime/device/gpu/queue_common.h"
20 #include "utils/ms_utils.h"
21
22 namespace mindspore {
23 namespace device {
24 const size_t kTimeout = 100;
25
GpuQueue(void * addr,const std::vector<size_t> & shape,const size_t & capacity)26 GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity)
27 : buffer_(addr),
28 head_(0),
29 tail_(0),
30 shape_(shape),
31 len_(0),
32 size_(0),
33 capacity_(capacity),
34 stream_(0),
35 node_info_(nullptr) {
36 CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
37 node_info_ = std::make_unique<NodeInfo[]>(capacity);
38 for (auto item : shape) {
39 len_ += item;
40 }
41 }
42
~GpuQueue()43 GpuQueue::~GpuQueue() { buffer_ = nullptr; }
44
Push(const std::vector<DataItemGpu> & data)45 BlockQueueStatus_T GpuQueue::Push(const std::vector<DataItemGpu> &data) {
46 int offset = 0;
47 for (size_t i = 0; i < data.size(); i++) {
48 auto item = data[i];
49 if (item.data_ptr_ == nullptr || item.data_len_ != shape_[i]) {
50 MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
51 return ERROR_INPUT;
52 }
53
54 void *addr = reinterpret_cast<unsigned char *>(buffer_) + tail_ * len_ + offset;
55 CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_),
56 "Cuda Memcpy Error");
57
58 offset += item.data_len_;
59 }
60
61 node_info_[tail_].event_.reset(new cudaEvent_t());
62 CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed");
63 CHECK_CUDA_RET_WITH_ERROR(cudaEventRecord(*(node_info_[tail_].event_), stream_), "Cuda Create Event Failed");
64 node_info_[tail_].data_ = data;
65 tail_ = (tail_ + 1) % (capacity_);
66 ++size_;
67 return SUCCESS;
68 }
69
Front(void ** addr,size_t * len) const70 BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const {
71 CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
72 CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
73 *addr = (unsigned char *)buffer_ + head_ * len_;
74 *len = len_;
75
76 for (auto item : node_info_[head_].data_) {
77 host_release_(item.data_ptr_, item.worker_id_);
78 }
79 return SUCCESS;
80 }
81
Pop()82 BlockQueueStatus_T GpuQueue::Pop() {
83 head_ = (head_ + 1) % (capacity_);
84 --size_;
85 return SUCCESS;
86 }
87
Destroy()88 bool GpuQueue::Destroy() {
89 if (stream_ != nullptr) {
90 auto ret = cudaStreamDestroy(stream_);
91 if (ret == cudaSuccess) {
92 return true;
93 } else {
94 return false;
95 }
96 } else {
97 return true;
98 }
99 }
100
Create(void * addr,const std::vector<size_t> & shape,const size_t & capacity)101 BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity) {
102 if (addr == nullptr) {
103 MS_LOG(ERROR) << "addr is nullptr";
104 return INTERNAL_ERROR;
105 }
106 queue_ = std::make_shared<GpuQueue>(addr, shape, capacity);
107 return SUCCESS;
108 }
109
RegisterRelease(const std::function<void (void *,int32_t)> & func)110 void BlockingQueue::RegisterRelease(const std::function<void(void *, int32_t)> &func) { queue_->RegisterRelease(func); }
111
Push(const std::vector<DataItemGpu> & data,unsigned int)112 BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataItemGpu> &data, unsigned int) {
113 std::unique_lock<std::mutex> locker(mutex_);
114 if (queue_->IsFull()) {
115 if (not_full_cond_.wait_for(locker, std::chrono::microseconds(kTimeout)) == std::cv_status::timeout) {
116 return TIMEOUT;
117 }
118 }
119 auto ret = queue_->Push(data);
120 if (ret) {
121 return ret;
122 }
123 not_empty_cond_.notify_one();
124 return SUCCESS;
125 }
126
Front(void ** addr,size_t * len)127 BlockQueueStatus_T BlockingQueue::Front(void **addr, size_t *len) {
128 std::unique_lock<std::mutex> locker(mutex_);
129 bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); });
130 if (!timeout) {
131 return TIMEOUT;
132 }
133
134 return queue_->Front(addr, len);
135 }
136
Pop()137 BlockQueueStatus_T BlockingQueue::Pop() {
138 std::unique_lock<std::mutex> locker(mutex_);
139 not_empty_cond_.wait(locker, [this] { return !queue_->IsEmpty(); });
140 auto ret = queue_->Pop();
141 if (ret) {
142 return ret;
143 }
144 not_full_cond_.notify_one();
145 return SUCCESS;
146 }
147
Destroy()148 bool BlockingQueue::Destroy() {
149 if (queue_ != nullptr) {
150 return queue_->Destroy();
151 } else {
152 return true;
153 }
154 }
155 } // namespace device
156 } // namespace mindspore
157