1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/datasetops/project_op.h"
18 #include <algorithm>
19 #include <iostream>
20 #include <string>
21 #include <unordered_map>
22 #include <vector>
23
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/util/log_adapter.h"
26
27 namespace mindspore {
28 namespace dataset {
ProjectOp(const std::vector<std::string> & columns_to_project)29 ProjectOp::ProjectOp(const std::vector<std::string> &columns_to_project)
30 : PipelineOp(0), columns_to_project_(columns_to_project) {}
31
Print(std::ostream & out,bool show_all) const32 void ProjectOp::Print(std::ostream &out, bool show_all) const {
33 if (!show_all) {
34 // Call the super class for displaying any common 1-liner info
35 PipelineOp::Print(out, show_all);
36 // Then show any custom derived-internal 1-liner info for this op
37 out << "\n";
38 } else {
39 // Call the super class for displaying any common detailed info
40 PipelineOp::Print(out, show_all);
41 // Then show any custom derived-internal stuff
42 out << "\nColumns that are projected:";
43 for (size_t i = 0; i < columns_to_project_.size(); i++) {
44 out << "\n" << columns_to_project_[i];
45 }
46 out << "\n\n";
47 }
48 }
49
50 // Gets a row from the child operator and projects the buffer.
GetNextRow(TensorRow * row,int32_t worker_id,bool retry_if_eoe)51 Status ProjectOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) {
52 RETURN_IF_NOT_OK(child_[0]->GetNextRow(row, worker_id, retry_if_eoe));
53 if (!row->eoe() && !row->eof()) {
54 *row = Project(*row);
55 }
56 if (row->eoe()) {
57 UpdateRepeatAndEpochCounter();
58 }
59 return Status::OK();
60 }
61
Project(const TensorRow & row)62 TensorRow ProjectOp::Project(const TensorRow &row) {
63 TensorRow new_row;
64 (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), std::back_inserter(new_row),
65 [&row](uint32_t x) { return row[x]; });
66 // Now if columns changed after map, we don't know which column we should keep,
67 // so temporarily we don't support print file_path after ProjectOp.
68 new_row.setPath({});
69 return new_row;
70 }
71
72 // Class functor operator () override.
73 // Most dataset ops operate by launching a thread (see ExecutionTree).
74 // However, the ProjectOp is defined as a inlined operator, so it is invalid to launch the
75 // functor since this op runs inlined inside another operator. The function is overloaded to
76 // ensure that it is not called by mistake (it will generate an error).
operator ()()77 Status ProjectOp::operator()() { RETURN_STATUS_UNEXPECTED("Logic error. ProjectOp is an inlined operator."); }
78
NumConsumers() const79 int32_t ProjectOp::NumConsumers() const {
80 if (parent_.empty()) {
81 MS_LOG(DEBUG) << "Project operator, no parent node, assuming it's the root and returning 1.";
82 return 1;
83 } else if (parent_[0] == nullptr) {
84 MS_LOG(DEBUG) << "Project operator, pointer to the first parent is null. Returning 0.";
85 return 0;
86 } else {
87 return parent_[0]->NumConsumers();
88 }
89 }
90
NumProducers() const91 int32_t ProjectOp::NumProducers() const {
92 if (child_.empty() || child_[0] == nullptr) {
93 MS_LOG(DEBUG) << "Project operator, pointer to child node is null. Returning 0.";
94 return 0;
95 } else {
96 return child_[0]->NumProducers();
97 }
98 }
99
EoeReceived(int32_t worker_id)100 Status ProjectOp::EoeReceived(int32_t worker_id) {
101 state_ = OpState::kDeOpIdle;
102 return Status::OK();
103 }
104
EofReceived(int32_t worker_id)105 Status ProjectOp::EofReceived(int32_t worker_id) { return Status::OK(); }
106
107 // Compute the column map and save it into our own column name map
108 // We cannot use the super class ComputeColMap here because we're making a modification of the
109 // map from the child map.
ComputeColMap()110 Status ProjectOp::ComputeColMap() {
111 if (column_name_id_map_.empty()) {
112 std::unordered_map<std::string, int32_t> child_column_name_mapping = child_[0]->column_name_id_map();
113 for (size_t i = 0; i < columns_to_project_.size(); i++) {
114 std::string ¤t_column = columns_to_project_[i];
115 if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) {
116 std::string err_msg = "Invalid parameter, column name: " + current_column + " does not exist in dataset.";
117 RETURN_STATUS_UNEXPECTED(err_msg);
118 }
119 // Setup the new column name mapping for ourself (base class field)
120 column_name_id_map_[current_column] = i;
121 projected_column_indices_.push_back(child_column_name_mapping[current_column]);
122 }
123 } else {
124 MS_LOG(WARNING) << "Column name map is already set!";
125 }
126 return Status::OK();
127 }
128
GetNextRowPullMode(TensorRow * const row)129 Status ProjectOp::GetNextRowPullMode(TensorRow *const row) {
130 RETURN_IF_NOT_OK(ComputeColMap());
131 TensorRow new_row;
132 RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row));
133 (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), std::back_inserter(*row),
134 [&new_row](uint32_t x) { return new_row[x]; });
135 // Now if columns changed after map, we don't know which column we should keep,
136 // so temporarily we don't support print file_path after ProjectOp.
137 new_row.setPath({});
138 return Status::OK();
139 }
140 } // namespace dataset
141 } // namespace mindspore
142