• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/dataset_iterator.h"
17 #include <algorithm>
18 #include <unordered_map>
19 #include <utility>
20 #include "minddata/dataset/core/data_type.h"
21 #include "minddata/dataset/core/tensor.h"
22 #include "minddata/dataset/core/tensor_shape.h"
23 
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/engine/datasetops/dataset_op.h"
27 #ifndef ENABLE_SECURITY
28 #include "minddata/dataset/engine/perf/profiling.h"
29 #endif
30 
31 namespace mindspore {
32 namespace dataset {
33 // Fetches one row of data from the iterator as a column map.
GetNextAsMap(TensorMap * out_map)34 Status DatasetIterator::GetNextAsMap(TensorMap *out_map) {
35   if (out_map == nullptr) {
36     RETURN_STATUS_UNEXPECTED("Null output map in iterator!");
37   }
38 
39   out_map->clear();
40 
41   TensorRow curr_row;
42   MS_LOG(INFO) << "get next as map start.";
43   RETURN_IF_NOT_OK(FetchNextTensorRow(&curr_row));
44   MS_LOG(INFO) << "fetchNextTensor success.";
45 
46   // Return empty map if there's no data
47   if (curr_row.empty()) {
48     return Status::OK();
49   }
50 
51   // The column name mapping is needed to be able to produce the tensor map output.
52   // The column name mapping comes from the source operator that is producing the data into the iterator.
53   // To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping
54   // and save in the iterator.  We only have to do this once.  All subsequent iterations use the same mapping.
55   if (col_name_id_map_.empty()) {
56     // Determine the column name map by calling the derived class method to retrieve the column
57     // name map
58     col_name_id_map_ = this->GetColumnNameMap();
59   }
60 
61   // Populate the out map from the row and return it
62   for (const auto &colMap : col_name_id_map_) {
63     std::string column_name = colMap.first;
64     // Need to filter meta column start with kDftMetaColumnPrefix
65     size_t pos = column_name.find(kDftMetaColumnPrefix);
66     if (pos != std::string::npos && pos == 0) {
67       continue;
68     }
69     (*out_map)[colMap.first] = std::move(curr_row[colMap.second]);
70   }
71   if (out_map->size() == 0) {
72     std::string err_msg = "No effective column found, maybe all columns are meta column and will be filtered. ";
73     err_msg += "If you want to output meta column please rename column name to a new one which is not start with ";
74     err_msg += "\"" + std::string(kDftMetaColumnPrefix) + "\"";
75     RETURN_STATUS_UNEXPECTED(err_msg);
76   }
77 
78   return Status::OK();
79 }
80 // Constructor of the DatasetIterator
DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)81 DatasetIterator::DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)
82     : root_(exe_tree->root()),
83 #ifndef ENABLE_SECURITY
84       tracing_(nullptr),
85 #endif
86       cur_batch_num_(0),
87       cur_connector_size_(0),
88       cur_connector_capacity_(0),
89       eof_handled_(false) {
90   std::shared_ptr<Tracing> node;
91 #ifndef ENABLE_SECURITY
92   Status s = GlobalContext::profiling_manager()->GetTracingNode(kDatasetIteratorTracingName, &node);
93   if (s.IsOk()) {
94     tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
95   }
96 #endif
97 }
98 
99 DatasetIterator::~DatasetIterator() = default;
100 
101 // Fetches one row of data from the iterator.  Overrides the base class.  This one fetches
102 // from the tree root node directly.
FetchNextTensorRow(TensorRow * out_row)103 Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
104   if (out_row == nullptr) {
105     RETURN_STATUS_UNEXPECTED("Null output row in iterator!");
106   }
107   // clear the old tensor row
108   out_row->clear();
109 #ifndef ENABLE_SECURITY
110   bool is_profiling_enable = GlobalContext::profiling_manager()->IsProfilingEnable(root_->Tree());
111 #endif
112   // Once eof is handled, always return empty row.  Class must be destroyed and recreated if you
113   // want to iterate again.
114   if (eof_handled_) {
115     std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
116     RETURN_STATUS_UNEXPECTED(err);
117   }
118 #ifndef ENABLE_SECURITY
119   if (tracing_ != nullptr) {
120     cur_connector_size_ = root_->ConnectorSize();
121     cur_connector_capacity_ = root_->ConnectorCapacity();
122   }
123 #endif
124   RETURN_IF_NOT_OK(root_->GetNextRow(out_row));
125 
126   // Since GetNextRow was used rather than GetNextInput(), it means we need to manually
127   // handle eoe and eof messages here.
128   //
129   // An eoe row means we have iterated an epoch.
130   // The next row in the pipeline might be an EOF or a TensorRow for next epoch
131   if (out_row->eoe()) {
132     MS_LOG(INFO) << "End of data iteration.  cur_batch_num_: " << cur_batch_num_;
133 #ifndef ENABLE_SECURITY
134     if (is_profiling_enable) {
135       root_->Tree()->SetEpochEnd();
136       GlobalContext::profiling_manager()->RecordEndOfEpoch(static_cast<uint32_t>(cur_batch_num_));
137     }
138 #endif
139     return Status::OK();
140   }
141 
142   // An eof buffer means it is the end of execution and all operators are shutting down.
143   // Because there is no more data to return to the caller, this will change `eof_handled_` state and
144   // returns status unexpected error.
145   if (out_row->eof()) {
146     eof_handled_ = true;
147     root_->Tree()->SetFinished();
148     std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
149     RETURN_STATUS_UNEXPECTED(err);
150   }
151 #ifndef ENABLE_SECURITY
152   if (tracing_ != nullptr) {
153     cur_batch_num_++;
154     tracing_->Record(static_cast<int32_t>(CONNECTOR_DEPTH), cur_connector_capacity_, cur_batch_num_,
155                      cur_connector_size_, ProfilingTime::GetCurMilliSecond());
156   }
157 #endif
158   return Status::OK();
159 }
160 
161 // Getter
GetColumnNameMap() const162 std::unordered_map<std::string, int32_t> DatasetIterator::GetColumnNameMap() const {
163   return root_->column_name_id_map();
164 }
165 
166 // Constructor of the ChildIterator
ChildIterator(DatasetOp * current_op,int32_t worker_id,int32_t child_idx)167 ChildIterator::ChildIterator(DatasetOp *current_op, int32_t worker_id, int32_t child_idx)
168     : current_op_(current_op), child_idx_(child_idx), worker_id_(worker_id), end_epoch_(false), eof_handled_(false) {}
169 
~ChildIterator()170 ChildIterator::~ChildIterator() { current_op_ = nullptr; }
171 
172 // Fetches one row of data from the iterator.  Overrides the base class.  This one fetches
173 // only from the child/worker id as given from the constructor.
FetchNextTensorRow(TensorRow * out_row)174 Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) {
175   RETURN_UNEXPECTED_IF_NULL(out_row);
176   // clear the old tensor row
177   out_row->clear();
178 
179   // Once eof is handled, always return empty row.  Class must be destroyed and recreated if you
180   // want to iterate again.
181   if (eof_handled_) {
182     std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
183     RETURN_STATUS_UNEXPECTED(err);
184   }
185 
186   RETURN_IF_NOT_OK(CollectOpInfoStart(current_op_->NameWithID(), "GetFromPreviousOp"));
187   RETURN_IF_NOT_OK(current_op_->child(child_idx_)->GetNextRow(out_row));
188   RETURN_IF_NOT_OK(
189     CollectOpInfoEnd(current_op_->NameWithID(), "GetFromPreviousOp", {{"TensorRowFlags", out_row->FlagName()}}));
190 
191   // If an eoe is picked up here, we simply return an empty vector and it's up to the
192   // caller to decide what it wants to do next.TensorRow
193   if (out_row->eoe()) {
194     MS_LOG(DEBUG) << "(" << current_op_->NameWithID() << ", " << child_idx_ << ")"
195                   << "Child iterator picked up EOE.";
196     end_epoch_ = true;
197     return Status::OK();
198   } else {
199     end_epoch_ = false;
200   }
201 
202   if (out_row->eof()) {
203     MS_LOG(DEBUG) << "(" << current_op_->NameWithID() << ", " << child_idx_ << ")"
204                   << "Child iterator picked up EOF.";
205     eof_handled_ = true;
206     *out_row = TensorRow(TensorRow::kFlagEOF);
207   }
208   return Status::OK();
209 }
210 
211 // drain till the next eoe
Drain()212 Status ChildIterator::Drain() {
213   if (end_epoch_ == true) {
214     // Calling drain against a child that is already at it's eoe state will not result in any action.
215     // This allows you to do:
216     // - fetch until empty row
217     // - drain (will not actually drain because you are already at the end of the iteration)
218     // However, the next time after that, it will perform it's normal draining activities.
219     end_epoch_ = false;
220     MS_LOG(DEBUG) << "No operation drain, already at end of epoch.";
221     return Status::OK();
222   }
223   MS_LOG(DEBUG) << "Child draining buffers until eoe.";
224   TensorRow row;
225   // else we drain until eoe or eof, eof here is for sanity check
226   while (!row.eoe() && !row.eof()) {
227     RETURN_IF_NOT_OK(current_op_->child(child_idx_)->GetNextRow(&row));
228   }
229   if (row.eof()) {
230     RETURN_STATUS_UNEXPECTED("Child iterator picked up EOF in drain.");
231   }
232   return Status::OK();
233 }
234 
235 // Getter
GetColumnNameMap() const236 std::unordered_map<std::string, int32_t> ChildIterator::GetColumnNameMap() const {
237   return current_op_->child(child_idx_)->column_name_id_map();
238 }
239 }  // namespace dataset
240 }  // namespace mindspore
241