• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_
18 
19 #ifdef ENABLE_GPUQUE
20 #include <memory>
21 #include <string>
22 #include <utility>
23 #include <vector>
24 #include "minddata/dataset/engine/connector.h"
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/include/dataset/constants.h"
27 #include "runtime/device/gpu/blocking_queue.h"
28 
29 using mindspore::device::DataItemGpu;
30 
31 namespace mindspore {
32 namespace dataset {
33 class GpuItemConnector : public Connector<std::vector<device::DataItemGpu>> {
34  public:
GpuItemConnector(int32_t num_producers,int32_t num_consumers,int32_t queue_capacity)35   GpuItemConnector(int32_t num_producers, int32_t num_consumers, int32_t queue_capacity)
36       : Connector<std::vector<device::DataItemGpu>>(num_producers, num_consumers, queue_capacity) {
37     for (int i = 0; i < num_producers; i++) {
38       is_queue_finished_.push_back(false);
39     }
40   }
41 
42   ~GpuItemConnector() = default;
43 
Add(int32_t worker_d,std::vector<device::DataItemGpu> && element)44   Status Add(int32_t worker_d, std::vector<device::DataItemGpu> &&element) noexcept {
45     return Connector<std::vector<device::DataItemGpu>>::Push(worker_d, std::move(element));
46   }
47 
Pop(int32_t worker_id,std::vector<device::DataItemGpu> * result)48   Status Pop(int32_t worker_id, std::vector<device::DataItemGpu> *result) noexcept override {
49     RETURN_UNEXPECTED_IF_NULL(result);
50     {
51       MS_ASSERT(worker_id < num_consumers_);
52       std::unique_lock<std::mutex> lock(m_);
53       RETURN_IF_NOT_OK(cv_.Wait(&lock, [this, worker_id]() { return expect_consumer_ == worker_id; }));
54       if (is_queue_finished_[pop_from_]) {
55         std::string errMsg = "ERROR: popping from a finished queue in GpuItemConnector";
56         RETURN_STATUS_UNEXPECTED(errMsg);
57       }
58 
59       RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result));
60       if ((*result).empty()) {
61         is_queue_finished_[pop_from_] = true;
62       }
63 
64       for (int offset = 1; offset <= num_producers_; offset++) {
65         int32_t nextQueueIndex = (pop_from_ + offset) % num_producers_;
66         if (is_queue_finished_[nextQueueIndex] == false) {
67           pop_from_ = nextQueueIndex;
68           break;
69         }
70       }
71 
72       expect_consumer_ = (expect_consumer_ + 1) % num_consumers_;
73     }
74 
75     cv_.NotifyAll();
76     return Status::OK();
77   }
78 
79  private:
80   std::vector<bool> is_queue_finished_;
81 };
82 }  // namespace dataset
83 }  // namespace mindspore
84 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_GPU_ITEM_CONNECTOR_H_
85 #endif  // ENABLE_GPUQUE
86