1 /** 2 * Copyright 2020 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_ 18 19 #ifdef ENABLE_GPUQUE 20 #include <memory> 21 #include <string> 22 #include <utility> 23 #include <vector> 24 #include "minddata/dataset/engine/connector.h" 25 #include "minddata/dataset/util/status.h" 26 #include "minddata/dataset/include/dataset/constants.h" 27 #include "runtime/device/gpu/blocking_queue.h" 28 29 using mindspore::device::DataItemGpu; 30 31 namespace mindspore { 32 namespace dataset { 33 class GpuItemConnector : public Connector<std::vector<device::DataItemGpu>> { 34 public: GpuItemConnector(int32_t num_producers,int32_t num_consumers,int32_t queue_capacity)35 GpuItemConnector(int32_t num_producers, int32_t num_consumers, int32_t queue_capacity) 36 : Connector<std::vector<device::DataItemGpu>>(num_producers, num_consumers, queue_capacity) { 37 for (int i = 0; i < num_producers; i++) { 38 is_queue_finished_.push_back(false); 39 } 40 } 41 42 ~GpuItemConnector() = default; 43 Add(int32_t worker_d,std::vector<device::DataItemGpu> && element)44 Status Add(int32_t worker_d, std::vector<device::DataItemGpu> &&element) noexcept { 45 return Connector<std::vector<device::DataItemGpu>>::Push(worker_d, std::move(element)); 46 } 47 Pop(int32_t worker_id,std::vector<device::DataItemGpu> * result)48 Status Pop(int32_t worker_id, std::vector<device::DataItemGpu> *result) noexcept override { 49 RETURN_UNEXPECTED_IF_NULL(result); 50 { 51 MS_ASSERT(worker_id < num_consumers_); 52 std::unique_lock<std::mutex> lock(m_); 53 RETURN_IF_NOT_OK(cv_.Wait(&lock, [this, worker_id]() { return expect_consumer_ == worker_id; })); 54 if (is_queue_finished_[pop_from_]) { 55 std::string errMsg = "ERROR: popping from a finished queue in GpuItemConnector"; 56 RETURN_STATUS_UNEXPECTED(errMsg); 57 } 58 59 RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result)); 60 if ((*result).empty()) { 61 is_queue_finished_[pop_from_] = true; 62 } 63 64 for (int offset = 1; offset <= num_producers_; offset++) { 65 int32_t nextQueueIndex = (pop_from_ + offset) % num_producers_; 66 if (is_queue_finished_[nextQueueIndex] == false) { 67 pop_from_ = nextQueueIndex; 68 break; 69 } 70 } 71 72 expect_consumer_ = (expect_consumer_ + 1) % num_consumers_; 73 } 74 75 cv_.NotifyAll(); 76 return Status::OK(); 77 } 78 79 private: 80 std::vector<bool> is_queue_finished_; 81 }; 82 } // namespace dataset 83 } // namespace mindspore 84 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_ 85 #endif // ENABLE_GPUQUE 86