• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/dataset_iterator.h"
17 #include <algorithm>
18 #include <unordered_map>
19 #include <utility>
20 #include "minddata/dataset/core/data_type.h"
21 #include "minddata/dataset/core/tensor.h"
22 #include "minddata/dataset/core/tensor_shape.h"
23 
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/engine/datasetops/dataset_op.h"
27 #ifndef ENABLE_SECURITY
28 #include "minddata/dataset/engine/perf/profiling.h"
29 #endif
30 
31 namespace mindspore {
32 namespace dataset {
33 // Fetches one row of data from the iterator as a column map.
GetNextAsMap(TensorMap * out_map)34 Status DatasetIterator::GetNextAsMap(TensorMap *out_map) {
35   if (out_map == nullptr) {
36     RETURN_STATUS_UNEXPECTED("Null output map in iterator!");
37   }
38 
39   out_map->clear();
40 
41   TensorRow curr_row;
42   MS_LOG(INFO) << "get next as map start.";
43   RETURN_IF_NOT_OK(FetchNextTensorRow(&curr_row));
44   MS_LOG(INFO) << "fetchNextTensor success.";
45 
46   // Return empty map if there's no data
47   if (curr_row.empty()) {
48     return Status::OK();
49   }
50 
51   // The column name mapping is needed to be able to produce the tensor map output.
52   // The column name mapping comes from the source operator that is producing the data into the iterator.
53   // To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping
54   // and save in the iterator.  We only have to do this once.  All subsequent iterations use the same mapping.
55   if (col_name_id_map_.empty()) {
56     // Determine the column name map by calling the derived class method to retrieve the column
57     // name map
58     col_name_id_map_ = this->GetColumnNameMap();
59   }
60 
61   // Populate the out map from the row and return it
62   for (const auto colMap : col_name_id_map_) {
63     std::string column_name = colMap.first;
64     // Need to filter meta column start with kDftMetaColumnPrefix
65     size_t pos = column_name.find(kDftMetaColumnPrefix);
66     if (pos != std::string::npos && pos == 0) {
67       continue;
68     }
69     (*out_map)[colMap.first] = std::move(curr_row[colMap.second]);
70   }
71   if (out_map->size() == 0) {
72     std::string err_msg = "No effective column found, maybe all columns are meta column and will be filtered. ";
73     err_msg += "If you want to output meta column please rename column name to a new one which is not start with ";
74     err_msg += "\"" + std::string(kDftMetaColumnPrefix) + "\"";
75     RETURN_STATUS_UNEXPECTED(err_msg);
76   }
77 
78   return Status::OK();
79 }
80 // Constructor of the DatasetIterator
DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)81 DatasetIterator::DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)
82     : root_(exe_tree->root()),
83 #ifndef ENABLE_SECURITY
84       tracing_(nullptr),
85 #endif
86       cur_batch_num_(0),
87       cur_connector_size_(0),
88       cur_connector_capacity_(0),
89       eof_handled_(false) {
90   std::shared_ptr<Tracing> node;
91 #ifndef ENABLE_SECURITY
92   Status s = exe_tree->GetProfilingManager()->GetTracingNode(kDatasetIteratorTracingName, &node);
93   if (s.IsOk()) {
94     tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
95   }
96 #endif
97 }
98 
99 DatasetIterator::~DatasetIterator() = default;
100 
101 // Fetches one row of data from the iterator.  Overrides the base class.  This one fetches
102 // from the tree root node directly.
FetchNextTensorRow(TensorRow * out_row)103 Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
104   if (out_row == nullptr) {
105     RETURN_STATUS_UNEXPECTED("Null output row in iterator!");
106   }
107   // clear the old tensor row
108   out_row->clear();
109 #ifndef ENABLE_SECURITY
110   bool is_profiling_enable = root_->Tree()->GetProfilingManager()->IsProfilingEnable();
111 #endif
112   // Once eof is handled, always return empty row.  Class must be destroyed and recreated if you
113   // want to iterate again.
114   if (eof_handled_) {
115     std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
116     RETURN_STATUS_UNEXPECTED(err);
117   }
118 #ifndef ENABLE_SECURITY
119   if (tracing_ != nullptr) {
120     cur_connector_size_ = root_->ConnectorSize();
121     cur_connector_capacity_ = root_->ConnectorCapacity();
122   }
123 #endif
124   RETURN_IF_NOT_OK(root_->GetNextRow(out_row));
125 
126   // Since GetNextRow was used rather than GetNextInput(), it means we need to manually
127   // handle eoe and eof messages here.
128   //
129   // An eoe row means we have iterated an epoch.
130   // The next row in the pipeline might be an EOF or a TensorRow for next epoch
131   if (out_row->eoe()) {
132     MS_LOG(INFO) << "End of data iteration.";
133 #ifndef ENABLE_SECURITY
134     if (is_profiling_enable) {
135       root_->Tree()->SetEpochEnd();
136     }
137 #endif
138     return Status::OK();
139   }
140 
141   // An eof buffer means it is the end of execution and all operators are shutting down.
142   // Because there is no more data to return to the caller, this will change `eof_handled_` state and
143   // returns status unexpected error.
144   if (out_row->eof()) {
145     eof_handled_ = true;
146     root_->Tree()->SetFinished();
147     std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
148     RETURN_STATUS_UNEXPECTED(err);
149   }
150 #ifndef ENABLE_SECURITY
151   if (tracing_ != nullptr) {
152     cur_batch_num_++;
153     RETURN_IF_NOT_OK(tracing_->Record(static_cast<int32_t>(CONNECTOR_DEPTH), cur_connector_capacity_, cur_batch_num_,
154                                       cur_connector_size_, ProfilingTime::GetCurMilliSecond()));
155   }
156 #endif
157   return Status::OK();
158 }
159 
160 // Getter
GetColumnNameMap() const161 std::unordered_map<std::string, int32_t> DatasetIterator::GetColumnNameMap() const {
162   return root_->column_name_id_map();
163 }
164 
165 // Constructor of the ChildIterator
ChildIterator(DatasetOp * current_op,int32_t worker_id,int32_t child_idx)166 ChildIterator::ChildIterator(DatasetOp *current_op, int32_t worker_id, int32_t child_idx)
167     : current_op_(current_op), child_idx_(child_idx), worker_id_(worker_id), end_epoch_(false), eof_handled_(false) {}
168 
~ChildIterator()169 ChildIterator::~ChildIterator() { current_op_ = nullptr; }
170 
171 // Fetches one row of data from the iterator.  Overrides the base class.  This one fetches
172 // only from the child/worker id as given from the constructor.
FetchNextTensorRow(TensorRow * out_row)173 Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) {
174   RETURN_UNEXPECTED_IF_NULL(out_row);
175   // clear the old tensor row
176   out_row->clear();
177 
178   // Once eof is handled, always return empty row.  Class must be destroyed and recreated if you
179   // want to iterate again.
180   if (eof_handled_) {
181     std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
182     RETURN_STATUS_UNEXPECTED(err);
183   }
184 
185   RETURN_IF_NOT_OK(current_op_->child(child_idx_)->GetNextRow(out_row, worker_id_));
186   // If an eoe is picked up here, we simply return an empty vector and it's up to the
187   // caller to decide what it wants to do next.TensorRow
188   if (out_row->eoe()) {
189     MS_LOG(DEBUG) << "(" << current_op_->NameWithID() << ", " << child_idx_ << ")"
190                   << "Child iterator picked up EOE.";
191     end_epoch_ = true;
192     return Status::OK();
193   } else {
194     end_epoch_ = false;
195   }
196 
197   if (out_row->eof()) {
198     MS_LOG(DEBUG) << "(" << current_op_->NameWithID() << ", " << child_idx_ << ")"
199                   << "Child iterator picked up EOF.";
200     eof_handled_ = true;
201     *out_row = TensorRow(TensorRow::kFlagEOF);
202   }
203   return Status::OK();
204 }
205 
206 // drain till the next eoe
Drain()207 Status ChildIterator::Drain() {
208   if (end_epoch_ == true) {
209     // Calling drain against a child that is already at it's eoe state will not result in any action.
210     // This allows you to do:
211     // - fetch until empty row
212     // - drain (will not actually drain because you are already at the end of the iteration)
213     // However, the next time after that, it will perform it's normal draining activities.
214     end_epoch_ = false;
215     MS_LOG(DEBUG) << "No operation drain, already at end of epoch.";
216     return Status::OK();
217   }
218   MS_LOG(DEBUG) << "Child draining buffers until eoe.";
219   TensorRow row;
220   // else we drain until eoe or eof, eof here is for sanity check
221   while (!row.eoe() && !row.eof()) {
222     RETURN_IF_NOT_OK(current_op_->child(child_idx_)->GetNextRow(&row, worker_id_));
223   }
224   if (row.eof()) {
225     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Child iterator picked up EOF in drain.");
226   }
227   return Status::OK();
228 }
229 
230 // Getter
GetColumnNameMap() const231 std::unordered_map<std::string, int32_t> ChildIterator::GetColumnNameMap() const {
232   return current_op_->child(child_idx_)->column_name_id_map();
233 }
234 }  // namespace dataset
235 }  // namespace mindspore
236