• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <set>
18 #include <string>
19 #include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h"
20 
21 namespace mindspore {
22 namespace dataset {
23 
24 // Constructor
25 CpuMapJob::CpuMapJob() = default;
26 
27 // Constructor
CpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations)28 CpuMapJob::CpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations) : MapJob(std::move(operations)) {}
29 
30 // Destructor
31 CpuMapJob::~CpuMapJob() = default;
32 
33 // A function to execute a cpu map job
Run(std::vector<TensorRow> in,std::vector<TensorRow> * out)34 Status CpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) {
35   int32_t num_rows = in.size();
36   for (int32_t row = 0; row < num_rows; row++) {
37     TensorRow input_row = in[row];
38     TensorRow result_row;
39     for (size_t i = 0; i < ops_.size(); i++) {
40       // Call compute function for cpu
41       Status rc = ops_[i]->Compute(input_row, &result_row);
42       if (rc.IsError()) {
43         std::string err_msg = "";
44         std::string op_name = ops_[i]->Name();
45         // Need to remove the suffix "Op" which length is 2
46         std::string abbr_op_name = op_name.substr(0, op_name.length() - 2);
47         err_msg += "map operation: [" + abbr_op_name + "] failed. ";
48         if (input_row.getPath().size() > 0 && !input_row.getPath()[0].empty()) {
49           err_msg += "The corresponding data files: " + input_row.getPath()[0];
50           if (input_row.getPath().size() > 1) {
51             std::set<std::string> path_set;
52             path_set.insert(input_row.getPath()[0]);
53             for (auto j = 1; j < input_row.getPath().size(); j++) {
54               if (!input_row.getPath()[j].empty() && path_set.find(input_row.getPath()[j]) == path_set.end()) {
55                 err_msg += ", " + input_row.getPath()[j];
56                 path_set.insert(input_row.getPath()[j]);
57               }
58             }
59           }
60           err_msg += ". ";
61         }
62         std::string tensor_err_msg = rc.GetErrDescription();
63         if (rc.GetLineOfCode() < 0) {
64           err_msg += "Error description:\n";
65         }
66         err_msg += tensor_err_msg;
67         rc.SetErrDescription(err_msg);
68         RETURN_IF_NOT_OK(rc);
69       }
70 
71       // Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list.
72       if (i + 1 < ops_.size()) {
73         input_row = std::move(result_row);
74       }
75     }
76     out->push_back(std::move(result_row));
77   }
78   return Status::OK();
79 }
80 
81 }  // namespace dataset
82 }  // namespace mindspore
83