• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "runtime/device/gpu/blocking_queue.h"
18 #include <chrono>
19 #include "runtime/device/gpu/queue_common.h"
20 #include "utils/ms_utils.h"
21 
22 namespace mindspore {
23 namespace device {
24 const size_t kTimeout = 100;
25 
GpuQueue(void * addr,const std::vector<size_t> & shape,const size_t & capacity)26 GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity)
27     : buffer_(addr),
28       head_(0),
29       tail_(0),
30       shape_(shape),
31       len_(0),
32       size_(0),
33       capacity_(capacity),
34       stream_(0),
35       node_info_(nullptr) {
36   CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
37   node_info_ = std::make_unique<NodeInfo[]>(capacity);
38   for (auto item : shape) {
39     len_ += item;
40   }
41 }
42 
~GpuQueue()43 GpuQueue::~GpuQueue() { buffer_ = nullptr; }
44 
Push(const std::vector<DataItemGpu> & data)45 BlockQueueStatus_T GpuQueue::Push(const std::vector<DataItemGpu> &data) {
46   int offset = 0;
47   for (size_t i = 0; i < data.size(); i++) {
48     auto item = data[i];
49     if (item.data_ptr_ == nullptr || item.data_len_ != shape_[i]) {
50       MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
51       return ERROR_INPUT;
52     }
53 
54     void *addr = reinterpret_cast<unsigned char *>(buffer_) + tail_ * len_ + offset;
55     CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_),
56                               "Cuda Memcpy Error");
57 
58     offset += item.data_len_;
59   }
60 
61   node_info_[tail_].event_.reset(new cudaEvent_t());
62   CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed");
63   CHECK_CUDA_RET_WITH_ERROR(cudaEventRecord(*(node_info_[tail_].event_), stream_), "Cuda Create Event Failed");
64   node_info_[tail_].data_ = data;
65   tail_ = (tail_ + 1) % (capacity_);
66   ++size_;
67   return SUCCESS;
68 }
69 
Front(void ** addr,size_t * len) const70 BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const {
71   CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
72   CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
73   *addr = (unsigned char *)buffer_ + head_ * len_;
74   *len = len_;
75 
76   for (auto item : node_info_[head_].data_) {
77     host_release_(item.data_ptr_, item.worker_id_);
78   }
79   return SUCCESS;
80 }
81 
Pop()82 BlockQueueStatus_T GpuQueue::Pop() {
83   head_ = (head_ + 1) % (capacity_);
84   --size_;
85   return SUCCESS;
86 }
87 
Destroy()88 bool GpuQueue::Destroy() {
89   if (stream_ != nullptr) {
90     auto ret = cudaStreamDestroy(stream_);
91     if (ret == cudaSuccess) {
92       return true;
93     } else {
94       return false;
95     }
96   } else {
97     return true;
98   }
99 }
100 
Create(void * addr,const std::vector<size_t> & shape,const size_t & capacity)101 BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity) {
102   if (addr == nullptr) {
103     MS_LOG(ERROR) << "addr is nullptr";
104     return INTERNAL_ERROR;
105   }
106   queue_ = std::make_shared<GpuQueue>(addr, shape, capacity);
107   return SUCCESS;
108 }
109 
RegisterRelease(const std::function<void (void *,int32_t)> & func)110 void BlockingQueue::RegisterRelease(const std::function<void(void *, int32_t)> &func) { queue_->RegisterRelease(func); }
111 
Push(const std::vector<DataItemGpu> & data,unsigned int)112 BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataItemGpu> &data, unsigned int) {
113   std::unique_lock<std::mutex> locker(mutex_);
114   if (queue_->IsFull()) {
115     if (not_full_cond_.wait_for(locker, std::chrono::microseconds(kTimeout)) == std::cv_status::timeout) {
116       return TIMEOUT;
117     }
118   }
119   auto ret = queue_->Push(data);
120   if (ret) {
121     return ret;
122   }
123   not_empty_cond_.notify_one();
124   return SUCCESS;
125 }
126 
Front(void ** addr,size_t * len)127 BlockQueueStatus_T BlockingQueue::Front(void **addr, size_t *len) {
128   std::unique_lock<std::mutex> locker(mutex_);
129   bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); });
130   if (!timeout) {
131     return TIMEOUT;
132   }
133 
134   return queue_->Front(addr, len);
135 }
136 
Pop()137 BlockQueueStatus_T BlockingQueue::Pop() {
138   std::unique_lock<std::mutex> locker(mutex_);
139   not_empty_cond_.wait(locker, [this] { return !queue_->IsEmpty(); });
140   auto ret = queue_->Pop();
141   if (ret) {
142     return ret;
143   }
144   not_full_cond_.notify_one();
145   return SUCCESS;
146 }
147 
Destroy()148 bool BlockingQueue::Destroy() {
149   if (queue_ != nullptr) {
150     return queue_->Destroy();
151   } else {
152     return true;
153   }
154 }
155 }  // namespace device
156 }  // namespace mindspore
157