1 /**
2 * Copyright 2019-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/dataset_iterator.h"
17 #include <algorithm>
18 #include <unordered_map>
19 #include <utility>
20 #include "minddata/dataset/core/data_type.h"
21 #include "minddata/dataset/core/tensor.h"
22 #include "minddata/dataset/core/tensor_shape.h"
23
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/engine/datasetops/dataset_op.h"
27 #ifndef ENABLE_SECURITY
28 #include "minddata/dataset/engine/perf/profiling.h"
29 #endif
30
31 namespace mindspore {
32 namespace dataset {
33 // Fetches one row of data from the iterator as a column map.
GetNextAsMap(TensorMap * out_map)34 Status DatasetIterator::GetNextAsMap(TensorMap *out_map) {
35 if (out_map == nullptr) {
36 RETURN_STATUS_UNEXPECTED("Null output map in iterator!");
37 }
38
39 out_map->clear();
40
41 TensorRow curr_row;
42 MS_LOG(INFO) << "get next as map start.";
43 RETURN_IF_NOT_OK(FetchNextTensorRow(&curr_row));
44 MS_LOG(INFO) << "fetchNextTensor success.";
45
46 // Return empty map if there's no data
47 if (curr_row.empty()) {
48 return Status::OK();
49 }
50
51 // The column name mapping is needed to be able to produce the tensor map output.
52 // The column name mapping comes from the source operator that is producing the data into the iterator.
53 // To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping
54 // and save in the iterator. We only have to do this once. All subsequent iterations use the same mapping.
55 if (col_name_id_map_.empty()) {
56 // Determine the column name map by calling the derived class method to retrieve the column
57 // name map
58 col_name_id_map_ = this->GetColumnNameMap();
59 }
60
61 // Populate the out map from the row and return it
62 for (const auto &colMap : col_name_id_map_) {
63 std::string column_name = colMap.first;
64 // Need to filter meta column start with kDftMetaColumnPrefix
65 size_t pos = column_name.find(kDftMetaColumnPrefix);
66 if (pos != std::string::npos && pos == 0) {
67 continue;
68 }
69 (*out_map)[colMap.first] = std::move(curr_row[colMap.second]);
70 }
71 if (out_map->size() == 0) {
72 std::string err_msg = "No effective column found, maybe all columns are meta column and will be filtered. ";
73 err_msg += "If you want to output meta column please rename column name to a new one which is not start with ";
74 err_msg += "\"" + std::string(kDftMetaColumnPrefix) + "\"";
75 RETURN_STATUS_UNEXPECTED(err_msg);
76 }
77
78 return Status::OK();
79 }
80 // Constructor of the DatasetIterator
DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)81 DatasetIterator::DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)
82 : root_(exe_tree->root()),
83 #ifndef ENABLE_SECURITY
84 tracing_(nullptr),
85 #endif
86 cur_batch_num_(0),
87 cur_connector_size_(0),
88 cur_connector_capacity_(0),
89 eof_handled_(false) {
90 std::shared_ptr<Tracing> node;
91 #ifndef ENABLE_SECURITY
92 Status s = GlobalContext::profiling_manager()->GetTracingNode(kDatasetIteratorTracingName, &node);
93 if (s.IsOk()) {
94 tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
95 }
96 #endif
97 }
98
99 DatasetIterator::~DatasetIterator() = default;
100
101 // Fetches one row of data from the iterator. Overrides the base class. This one fetches
102 // from the tree root node directly.
FetchNextTensorRow(TensorRow * out_row)103 Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
104 if (out_row == nullptr) {
105 RETURN_STATUS_UNEXPECTED("Null output row in iterator!");
106 }
107 // clear the old tensor row
108 out_row->clear();
109 #ifndef ENABLE_SECURITY
110 bool is_profiling_enable = GlobalContext::profiling_manager()->IsProfilingEnable(root_->Tree());
111 #endif
112 // Once eof is handled, always return empty row. Class must be destroyed and recreated if you
113 // want to iterate again.
114 if (eof_handled_) {
115 std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
116 RETURN_STATUS_UNEXPECTED(err);
117 }
118 #ifndef ENABLE_SECURITY
119 if (tracing_ != nullptr) {
120 cur_connector_size_ = root_->ConnectorSize();
121 cur_connector_capacity_ = root_->ConnectorCapacity();
122 }
123 #endif
124 RETURN_IF_NOT_OK(root_->GetNextRow(out_row));
125
126 // Since GetNextRow was used rather than GetNextInput(), it means we need to manually
127 // handle eoe and eof messages here.
128 //
129 // An eoe row means we have iterated an epoch.
130 // The next row in the pipeline might be an EOF or a TensorRow for next epoch
131 if (out_row->eoe()) {
132 MS_LOG(INFO) << "End of data iteration. cur_batch_num_: " << cur_batch_num_;
133 #ifndef ENABLE_SECURITY
134 if (is_profiling_enable) {
135 root_->Tree()->SetEpochEnd();
136 GlobalContext::profiling_manager()->RecordEndOfEpoch(static_cast<uint32_t>(cur_batch_num_));
137 }
138 #endif
139 return Status::OK();
140 }
141
142 // An eof buffer means it is the end of execution and all operators are shutting down.
143 // Because there is no more data to return to the caller, this will change `eof_handled_` state and
144 // returns status unexpected error.
145 if (out_row->eof()) {
146 eof_handled_ = true;
147 root_->Tree()->SetFinished();
148 std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
149 RETURN_STATUS_UNEXPECTED(err);
150 }
151 #ifndef ENABLE_SECURITY
152 if (tracing_ != nullptr) {
153 cur_batch_num_++;
154 tracing_->Record(static_cast<int32_t>(CONNECTOR_DEPTH), cur_connector_capacity_, cur_batch_num_,
155 cur_connector_size_, ProfilingTime::GetCurMilliSecond());
156 }
157 #endif
158 return Status::OK();
159 }
160
161 // Getter
GetColumnNameMap() const162 std::unordered_map<std::string, int32_t> DatasetIterator::GetColumnNameMap() const {
163 return root_->column_name_id_map();
164 }
165
166 // Constructor of the ChildIterator
ChildIterator(DatasetOp * current_op,int32_t worker_id,int32_t child_idx)167 ChildIterator::ChildIterator(DatasetOp *current_op, int32_t worker_id, int32_t child_idx)
168 : current_op_(current_op), child_idx_(child_idx), worker_id_(worker_id), end_epoch_(false), eof_handled_(false) {}
169
~ChildIterator()170 ChildIterator::~ChildIterator() { current_op_ = nullptr; }
171
172 // Fetches one row of data from the iterator. Overrides the base class. This one fetches
173 // only from the child/worker id as given from the constructor.
FetchNextTensorRow(TensorRow * out_row)174 Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) {
175 RETURN_UNEXPECTED_IF_NULL(out_row);
176 // clear the old tensor row
177 out_row->clear();
178
179 // Once eof is handled, always return empty row. Class must be destroyed and recreated if you
180 // want to iterate again.
181 if (eof_handled_) {
182 std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
183 RETURN_STATUS_UNEXPECTED(err);
184 }
185
186 RETURN_IF_NOT_OK(CollectOpInfoStart(current_op_->NameWithID(), "GetFromPreviousOp"));
187 RETURN_IF_NOT_OK(current_op_->child(child_idx_)->GetNextRow(out_row));
188 RETURN_IF_NOT_OK(
189 CollectOpInfoEnd(current_op_->NameWithID(), "GetFromPreviousOp", {{"TensorRowFlags", out_row->FlagName()}}));
190
191 // If an eoe is picked up here, we simply return an empty vector and it's up to the
192 // caller to decide what it wants to do next.TensorRow
193 if (out_row->eoe()) {
194 MS_LOG(DEBUG) << "(" << current_op_->NameWithID() << ", " << child_idx_ << ")"
195 << "Child iterator picked up EOE.";
196 end_epoch_ = true;
197 return Status::OK();
198 } else {
199 end_epoch_ = false;
200 }
201
202 if (out_row->eof()) {
203 MS_LOG(DEBUG) << "(" << current_op_->NameWithID() << ", " << child_idx_ << ")"
204 << "Child iterator picked up EOF.";
205 eof_handled_ = true;
206 *out_row = TensorRow(TensorRow::kFlagEOF);
207 }
208 return Status::OK();
209 }
210
211 // drain till the next eoe
Drain()212 Status ChildIterator::Drain() {
213 if (end_epoch_ == true) {
214 // Calling drain against a child that is already at it's eoe state will not result in any action.
215 // This allows you to do:
216 // - fetch until empty row
217 // - drain (will not actually drain because you are already at the end of the iteration)
218 // However, the next time after that, it will perform it's normal draining activities.
219 end_epoch_ = false;
220 MS_LOG(DEBUG) << "No operation drain, already at end of epoch.";
221 return Status::OK();
222 }
223 MS_LOG(DEBUG) << "Child draining buffers until eoe.";
224 TensorRow row;
225 // else we drain until eoe or eof, eof here is for sanity check
226 while (!row.eoe() && !row.eof()) {
227 RETURN_IF_NOT_OK(current_op_->child(child_idx_)->GetNextRow(&row));
228 }
229 if (row.eof()) {
230 RETURN_STATUS_UNEXPECTED("Child iterator picked up EOF in drain.");
231 }
232 return Status::OK();
233 }
234
235 // Getter
GetColumnNameMap() const236 std::unordered_map<std::string, int32_t> ChildIterator::GetColumnNameMap() const {
237 return current_op_->child(child_idx_)->column_name_id_map();
238 }
239 } // namespace dataset
240 } // namespace mindspore
241