• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_JAGGED_CONNECTOR_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_JAGGED_CONNECTOR_H_
18 
19 #include <memory>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 #include "minddata/dataset/engine/connector.h"
24 
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/include/dataset/constants.h"
27 
28 namespace mindspore {
29 namespace dataset {
30 class JaggedConnector : public Connector<TensorRow> {
31  public:
JaggedConnector(int32_t num_producers,int32_t num_consumers,int32_t queue_capacity)32   JaggedConnector(int32_t num_producers, int32_t num_consumers, int32_t queue_capacity)
33       : Connector<TensorRow>(num_producers, num_consumers, queue_capacity) {
34     for (int i = 0; i < num_producers; i++) {
35       is_queue_finished_.push_back(false);
36     }
37   }
38 
39   ~JaggedConnector() = default;
40 
Add(int32_t worker_d,TensorRow && element)41   Status Add(int32_t worker_d, TensorRow &&element) noexcept {
42     return Connector<TensorRow>::Push(worker_d, std::move(element));
43   }
44 
Pop(int32_t worker_id,TensorRow * result)45   Status Pop(int32_t worker_id, TensorRow *result) noexcept override {
46     RETURN_UNEXPECTED_IF_NULL(result);
47     {
48       MS_ASSERT(worker_id < num_consumers_);
49       std::unique_lock<std::mutex> lock(m_);
50       RETURN_IF_NOT_OK(cv_.Wait(&lock, [this, worker_id]() { return expect_consumer_ == worker_id; }));
51       if (is_queue_finished_[pop_from_]) {
52         std::string errMsg = "ERROR: popping from a finished queue in JaggedConnector";
53         RETURN_STATUS_UNEXPECTED(errMsg);
54       }
55 
56       RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result));
57       if (result != nullptr && result->eoe()) {
58         is_queue_finished_[pop_from_] = true;
59       }
60 
61       for (int offset = 1; offset <= num_producers_; offset++) {
62         int32_t nextQueueIndex = (pop_from_ + offset) % num_producers_;
63         if (is_queue_finished_[nextQueueIndex] == false) {
64           pop_from_ = nextQueueIndex;
65           break;
66         }
67       }
68 
69       expect_consumer_ = (expect_consumer_ + 1) % num_consumers_;
70     }
71 
72     cv_.NotifyAll();
73     return Status::OK();
74   }
75 
DoReset()76   void DoReset() {
77     for (int i = 0; i < is_queue_finished_.size(); i++) {
78       is_queue_finished_[i] = false;
79     }
80 
81     Connector<TensorRow>::Reset();
82   }
83 
84  private:
85   std::vector<bool> is_queue_finished_;
86 };
87 }  // namespace dataset
88 }  // namespace mindspore
89 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_JAGGED_CONNECTOR_H_
90