1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/dataset_iterator.h"
17 #include <algorithm>
18 #include <unordered_map>
19 #include <utility>
20 #include "minddata/dataset/core/data_type.h"
21 #include "minddata/dataset/core/tensor.h"
22 #include "minddata/dataset/core/tensor_shape.h"
23
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/engine/datasetops/dataset_op.h"
27 #ifndef ENABLE_SECURITY
28 #include "minddata/dataset/engine/perf/profiling.h"
29 #endif
30
31 namespace mindspore {
32 namespace dataset {
33 // Fetches one row of data from the iterator as a column map.
GetNextAsMap(TensorMap * out_map)34 Status DatasetIterator::GetNextAsMap(TensorMap *out_map) {
35 if (out_map == nullptr) {
36 RETURN_STATUS_UNEXPECTED("Null output map in iterator!");
37 }
38
39 out_map->clear();
40
41 TensorRow curr_row;
42 MS_LOG(INFO) << "get next as map start.";
43 RETURN_IF_NOT_OK(FetchNextTensorRow(&curr_row));
44 MS_LOG(INFO) << "fetchNextTensor success.";
45
46 // Return empty map if there's no data
47 if (curr_row.empty()) {
48 return Status::OK();
49 }
50
51 // The column name mapping is needed to be able to produce the tensor map output.
52 // The column name mapping comes from the source operator that is producing the data into the iterator.
53 // To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping
54 // and save in the iterator. We only have to do this once. All subsequent iterations use the same mapping.
55 if (col_name_id_map_.empty()) {
56 // Determine the column name map by calling the derived class method to retrieve the column
57 // name map
58 col_name_id_map_ = this->GetColumnNameMap();
59 }
60
61 // Populate the out map from the row and return it
62 for (const auto colMap : col_name_id_map_) {
63 std::string column_name = colMap.first;
64 // Need to filter meta column start with kDftMetaColumnPrefix
65 size_t pos = column_name.find(kDftMetaColumnPrefix);
66 if (pos != std::string::npos && pos == 0) {
67 continue;
68 }
69 (*out_map)[colMap.first] = std::move(curr_row[colMap.second]);
70 }
71 if (out_map->size() == 0) {
72 std::string err_msg = "No effective column found, maybe all columns are meta column and will be filtered. ";
73 err_msg += "If you want to output meta column please rename column name to a new one which is not start with ";
74 err_msg += "\"" + std::string(kDftMetaColumnPrefix) + "\"";
75 RETURN_STATUS_UNEXPECTED(err_msg);
76 }
77
78 return Status::OK();
79 }
80 // Constructor of the DatasetIterator
DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)81 DatasetIterator::DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)
82 : root_(exe_tree->root()),
83 #ifndef ENABLE_SECURITY
84 tracing_(nullptr),
85 #endif
86 cur_batch_num_(0),
87 cur_connector_size_(0),
88 cur_connector_capacity_(0),
89 eof_handled_(false) {
90 std::shared_ptr<Tracing> node;
91 #ifndef ENABLE_SECURITY
92 Status s = exe_tree->GetProfilingManager()->GetTracingNode(kDatasetIteratorTracingName, &node);
93 if (s.IsOk()) {
94 tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
95 }
96 #endif
97 }
98
99 DatasetIterator::~DatasetIterator() = default;
100
101 // Fetches one row of data from the iterator. Overrides the base class. This one fetches
102 // from the tree root node directly.
FetchNextTensorRow(TensorRow * out_row)103 Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
104 if (out_row == nullptr) {
105 RETURN_STATUS_UNEXPECTED("Null output row in iterator!");
106 }
107 // clear the old tensor row
108 out_row->clear();
109 #ifndef ENABLE_SECURITY
110 bool is_profiling_enable = root_->Tree()->GetProfilingManager()->IsProfilingEnable();
111 #endif
112 // Once eof is handled, always return empty row. Class must be destroyed and recreated if you
113 // want to iterate again.
114 if (eof_handled_) {
115 std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
116 RETURN_STATUS_UNEXPECTED(err);
117 }
118 #ifndef ENABLE_SECURITY
119 if (tracing_ != nullptr) {
120 cur_connector_size_ = root_->ConnectorSize();
121 cur_connector_capacity_ = root_->ConnectorCapacity();
122 }
123 #endif
124 RETURN_IF_NOT_OK(root_->GetNextRow(out_row));
125
126 // Since GetNextRow was used rather than GetNextInput(), it means we need to manually
127 // handle eoe and eof messages here.
128 //
129 // An eoe row means we have iterated an epoch.
130 // The next row in the pipeline might be an EOF or a TensorRow for next epoch
131 if (out_row->eoe()) {
132 MS_LOG(INFO) << "End of data iteration.";
133 #ifndef ENABLE_SECURITY
134 if (is_profiling_enable) {
135 root_->Tree()->SetEpochEnd();
136 }
137 #endif
138 return Status::OK();
139 }
140
141 // An eof buffer means it is the end of execution and all operators are shutting down.
142 // Because there is no more data to return to the caller, this will change `eof_handled_` state and
143 // returns status unexpected error.
144 if (out_row->eof()) {
145 eof_handled_ = true;
146 root_->Tree()->SetFinished();
147 std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
148 RETURN_STATUS_UNEXPECTED(err);
149 }
150 #ifndef ENABLE_SECURITY
151 if (tracing_ != nullptr) {
152 cur_batch_num_++;
153 RETURN_IF_NOT_OK(tracing_->Record(static_cast<int32_t>(CONNECTOR_DEPTH), cur_connector_capacity_, cur_batch_num_,
154 cur_connector_size_, ProfilingTime::GetCurMilliSecond()));
155 }
156 #endif
157 return Status::OK();
158 }
159
160 // Getter
GetColumnNameMap() const161 std::unordered_map<std::string, int32_t> DatasetIterator::GetColumnNameMap() const {
162 return root_->column_name_id_map();
163 }
164
165 // Constructor of the ChildIterator
ChildIterator(DatasetOp * current_op,int32_t worker_id,int32_t child_idx)166 ChildIterator::ChildIterator(DatasetOp *current_op, int32_t worker_id, int32_t child_idx)
167 : current_op_(current_op), child_idx_(child_idx), worker_id_(worker_id), end_epoch_(false), eof_handled_(false) {}
168
~ChildIterator()169 ChildIterator::~ChildIterator() { current_op_ = nullptr; }
170
171 // Fetches one row of data from the iterator. Overrides the base class. This one fetches
172 // only from the child/worker id as given from the constructor.
FetchNextTensorRow(TensorRow * out_row)173 Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) {
174 RETURN_UNEXPECTED_IF_NULL(out_row);
175 // clear the old tensor row
176 out_row->clear();
177
178 // Once eof is handled, always return empty row. Class must be destroyed and recreated if you
179 // want to iterate again.
180 if (eof_handled_) {
181 std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
182 RETURN_STATUS_UNEXPECTED(err);
183 }
184
185 RETURN_IF_NOT_OK(current_op_->child(child_idx_)->GetNextRow(out_row, worker_id_));
186 // If an eoe is picked up here, we simply return an empty vector and it's up to the
187 // caller to decide what it wants to do next.TensorRow
188 if (out_row->eoe()) {
189 MS_LOG(DEBUG) << "(" << current_op_->NameWithID() << ", " << child_idx_ << ")"
190 << "Child iterator picked up EOE.";
191 end_epoch_ = true;
192 return Status::OK();
193 } else {
194 end_epoch_ = false;
195 }
196
197 if (out_row->eof()) {
198 MS_LOG(DEBUG) << "(" << current_op_->NameWithID() << ", " << child_idx_ << ")"
199 << "Child iterator picked up EOF.";
200 eof_handled_ = true;
201 *out_row = TensorRow(TensorRow::kFlagEOF);
202 }
203 return Status::OK();
204 }
205
206 // drain till the next eoe
Drain()207 Status ChildIterator::Drain() {
208 if (end_epoch_ == true) {
209 // Calling drain against a child that is already at it's eoe state will not result in any action.
210 // This allows you to do:
211 // - fetch until empty row
212 // - drain (will not actually drain because you are already at the end of the iteration)
213 // However, the next time after that, it will perform it's normal draining activities.
214 end_epoch_ = false;
215 MS_LOG(DEBUG) << "No operation drain, already at end of epoch.";
216 return Status::OK();
217 }
218 MS_LOG(DEBUG) << "Child draining buffers until eoe.";
219 TensorRow row;
220 // else we drain until eoe or eof, eof here is for sanity check
221 while (!row.eoe() && !row.eof()) {
222 RETURN_IF_NOT_OK(current_op_->child(child_idx_)->GetNextRow(&row, worker_id_));
223 }
224 if (row.eof()) {
225 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Child iterator picked up EOF in drain.");
226 }
227 return Status::OK();
228 }
229
230 // Getter
GetColumnNameMap() const231 std::unordered_map<std::string, int32_t> ChildIterator::GetColumnNameMap() const {
232 return current_op_->child(child_idx_)->column_name_id_map();
233 }
234 } // namespace dataset
235 } // namespace mindspore
236