• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "minddata/dataset/engine/datasetops/project_op.h"
18 #include <algorithm>
19 #include <iostream>
20 #include <string>
21 #include <unordered_map>
22 #include <vector>
23 
24 #include "minddata/dataset/engine/execution_tree.h"
25 #include "minddata/dataset/util/log_adapter.h"
26 
27 namespace mindspore {
28 namespace dataset {
ProjectOp(const std::vector<std::string> & columns_to_project)29 ProjectOp::ProjectOp(const std::vector<std::string> &columns_to_project)
30     : PipelineOp(0), columns_to_project_(columns_to_project) {}
31 
Print(std::ostream & out,bool show_all) const32 void ProjectOp::Print(std::ostream &out, bool show_all) const {
33   if (!show_all) {
34     // Call the super class for displaying any common 1-liner info
35     PipelineOp::Print(out, show_all);
36     // Then show any custom derived-internal 1-liner info for this op
37     out << "\n";
38   } else {
39     // Call the super class for displaying any common detailed info
40     PipelineOp::Print(out, show_all);
41     // Then show any custom derived-internal stuff
42     out << "\nColumns that are projected:";
43     for (size_t i = 0; i < columns_to_project_.size(); i++) {
44       out << "\n" << columns_to_project_[i];
45     }
46     out << "\n\n";
47   }
48 }
49 
50 // Gets a row from the child operator and projects the buffer.
GetNextRow(TensorRow * row,int32_t worker_id,bool retry_if_eoe)51 Status ProjectOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) {
52   RETURN_IF_NOT_OK(child_[0]->GetNextRow(row, worker_id, retry_if_eoe));
53   if (!row->eoe() && !row->eof()) {
54     *row = Project(*row);
55   }
56   if (row->eoe()) {
57     UpdateRepeatAndEpochCounter();
58   }
59   return Status::OK();
60 }
61 
Project(const TensorRow & row)62 TensorRow ProjectOp::Project(const TensorRow &row) {
63   TensorRow new_row;
64   (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), std::back_inserter(new_row),
65                        [&row](uint32_t x) { return row[x]; });
66   // Now if columns changed after map, we don't know which column we should keep,
67   // so temporarily we don't support print file_path after ProjectOp.
68   new_row.setPath({});
69   return new_row;
70 }
71 
72 // Class functor operator () override.
73 // Most dataset ops operate by launching a thread (see ExecutionTree).
74 // However, the ProjectOp is defined as a inlined operator, so it is invalid to launch the
75 // functor since this op runs inlined inside another operator. The function is overloaded to
76 // ensure that it is not called by mistake (it will generate an error).
operator ()()77 Status ProjectOp::operator()() { RETURN_STATUS_UNEXPECTED("Logic error. ProjectOp is an inlined operator."); }
78 
NumConsumers() const79 int32_t ProjectOp::NumConsumers() const {
80   if (parent_.empty()) {
81     MS_LOG(DEBUG) << "Project operator, no parent node, assuming it's the root and returning 1.";
82     return 1;
83   } else if (parent_[0] == nullptr) {
84     MS_LOG(DEBUG) << "Project operator, pointer to the first parent is null. Returning 0.";
85     return 0;
86   } else {
87     return parent_[0]->NumConsumers();
88   }
89 }
90 
NumProducers() const91 int32_t ProjectOp::NumProducers() const {
92   if (child_.empty() || child_[0] == nullptr) {
93     MS_LOG(DEBUG) << "Project operator, pointer to child node is null. Returning 0.";
94     return 0;
95   } else {
96     return child_[0]->NumProducers();
97   }
98 }
99 
EoeReceived(int32_t worker_id)100 Status ProjectOp::EoeReceived(int32_t worker_id) {
101   state_ = OpState::kDeOpIdle;
102   return Status::OK();
103 }
104 
EofReceived(int32_t worker_id)105 Status ProjectOp::EofReceived(int32_t worker_id) { return Status::OK(); }
106 
107 // Compute the column map and save it into our own column name map
108 // We cannot use the super class ComputeColMap here because we're making a modification of the
109 // map from the child map.
ComputeColMap()110 Status ProjectOp::ComputeColMap() {
111   if (column_name_id_map_.empty()) {
112     std::unordered_map<std::string, int32_t> child_column_name_mapping = child_[0]->column_name_id_map();
113     for (size_t i = 0; i < columns_to_project_.size(); i++) {
114       std::string &current_column = columns_to_project_[i];
115       if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) {
116         std::string err_msg = "Invalid parameter, column name: " + current_column + " does not exist in dataset.";
117         RETURN_STATUS_UNEXPECTED(err_msg);
118       }
119       // Setup the new column name mapping for ourself (base class field)
120       column_name_id_map_[current_column] = i;
121       projected_column_indices_.push_back(child_column_name_mapping[current_column]);
122     }
123   } else {
124     MS_LOG(WARNING) << "Column name map is already set!";
125   }
126   return Status::OK();
127 }
128 
GetNextRowPullMode(TensorRow * const row)129 Status ProjectOp::GetNextRowPullMode(TensorRow *const row) {
130   RETURN_IF_NOT_OK(ComputeColMap());
131   TensorRow new_row;
132   RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row));
133   (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), std::back_inserter(*row),
134                        [&new_row](uint32_t x) { return new_row[x]; });
135   // Now if columns changed after map, we don't know which column we should keep,
136   // so temporarily we don't support print file_path after ProjectOp.
137   new_row.setPath({});
138   return Status::OK();
139 }
140 }  // namespace dataset
141 }  // namespace mindspore
142