1 /** 2 * Copyright 2019 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_ 18 #define MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_ 19 20 #include <unistd.h> 21 #include <cuda_runtime_api.h> 22 #include <iostream> 23 #include <memory> 24 #include <mutex> 25 #include <cstring> 26 #include <string> 27 #include <vector> 28 #include <condition_variable> 29 #include <functional> 30 31 namespace mindspore { 32 namespace device { 33 enum BlockQueueStatus_T : int { SUCCESS = 0, QUEUE_EXIST, HANDLE_NOT_EXIST, ERROR_INPUT, INTERNAL_ERROR, TIMEOUT }; 34 35 struct DataItemGpu { 36 int32_t worker_id_; 37 std::string data_type_; 38 size_t data_len_; 39 void *data_ptr_; 40 }; 41 42 class GpuQueue { 43 public: 44 GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity); 45 virtual ~GpuQueue(); 46 RegisterRelease(const std::function<void (void *,int32_t)> & func)47 void RegisterRelease(const std::function<void(void *, int32_t)> &func) { host_release_ = func; } 48 IsEmpty()49 inline bool IsEmpty() const { return size_ == 0; } IsFull()50 inline bool IsFull() const { return size_ == capacity_; } 51 52 BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data); 53 BlockQueueStatus_T Front(void **ptr, size_t *len) const; 54 BlockQueueStatus_T Pop(); 55 bool Destroy(); Size()56 size_t Size() { return size_; } Capacity()57 size_t Capacity() { return capacity_; } 58 59 private: 60 struct NodeInfo { 61 std::unique_ptr<cudaEvent_t> event_; 62 std::vector<DataItemGpu> data_; 63 }; 64 65 void *buffer_; 66 size_t head_; 67 size_t tail_; 68 std::vector<size_t> shape_; 69 size_t len_; 70 size_t size_; 71 size_t capacity_; 72 cudaStream_t stream_; 73 std::unique_ptr<NodeInfo[]> node_info_; 74 std::function<void(void *, int32_t)> host_release_; 75 76 GpuQueue(const GpuQueue &) = delete; 77 GpuQueue &operator=(const GpuQueue &) = delete; 78 }; 79 80 class BlockingQueue { 81 public: BlockingQueue()82 BlockingQueue() : queue_(nullptr) {} 83 ~BlockingQueue() = default; 84 85 BlockQueueStatus_T Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity); 86 void RegisterRelease(const std::function<void(void *, int32_t)> &func); 87 BlockQueueStatus_T Push(const std::vector<DataItemGpu> &data, unsigned int timeout_in_sec); 88 BlockQueueStatus_T Front(void **ptr, size_t *len); 89 BlockQueueStatus_T Pop(); 90 bool Destroy(); Size()91 size_t Size() { return queue_->Size(); } Capacity()92 size_t Capacity() { return queue_->Capacity(); } 93 94 private: 95 std::mutex mutex_; 96 std::condition_variable not_full_cond_; 97 std::condition_variable not_empty_cond_; 98 std::shared_ptr<GpuQueue> queue_; 99 }; 100 } // namespace device 101 } // namespace mindspore 102 103 #endif // MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_BLOCKING_QUEUE_H_ 104