1 /**
2 * Copyright 2020 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <set>
18 #include <string>
19 #include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h"
20
21 namespace mindspore {
22 namespace dataset {
23
24 // Constructor
25 CpuMapJob::CpuMapJob() = default;
26
27 // Constructor
CpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations)28 CpuMapJob::CpuMapJob(std::vector<std::shared_ptr<TensorOp>> operations) : MapJob(std::move(operations)) {}
29
30 // Destructor
31 CpuMapJob::~CpuMapJob() = default;
32
33 // A function to execute a cpu map job
Run(std::vector<TensorRow> in,std::vector<TensorRow> * out)34 Status CpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) {
35 int32_t num_rows = in.size();
36 for (int32_t row = 0; row < num_rows; row++) {
37 TensorRow input_row = in[row];
38 TensorRow result_row;
39 for (size_t i = 0; i < ops_.size(); i++) {
40 // Call compute function for cpu
41 Status rc = ops_[i]->Compute(input_row, &result_row);
42 if (rc.IsError()) {
43 std::string err_msg = "";
44 std::string op_name = ops_[i]->Name();
45 // Need to remove the suffix "Op" which length is 2
46 std::string abbr_op_name = op_name.substr(0, op_name.length() - 2);
47 err_msg += "map operation: [" + abbr_op_name + "] failed. ";
48 if (input_row.getPath().size() > 0 && !input_row.getPath()[0].empty()) {
49 err_msg += "The corresponding data files: " + input_row.getPath()[0];
50 if (input_row.getPath().size() > 1) {
51 std::set<std::string> path_set;
52 path_set.insert(input_row.getPath()[0]);
53 for (auto j = 1; j < input_row.getPath().size(); j++) {
54 if (!input_row.getPath()[j].empty() && path_set.find(input_row.getPath()[j]) == path_set.end()) {
55 err_msg += ", " + input_row.getPath()[j];
56 path_set.insert(input_row.getPath()[j]);
57 }
58 }
59 }
60 err_msg += ". ";
61 }
62 std::string tensor_err_msg = rc.GetErrDescription();
63 if (rc.GetLineOfCode() < 0) {
64 err_msg += "Error description:\n";
65 }
66 err_msg += tensor_err_msg;
67 rc.SetErrDescription(err_msg);
68 RETURN_IF_NOT_OK(rc);
69 }
70
71 // Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list.
72 if (i + 1 < ops_.size()) {
73 input_row = std::move(result_row);
74 }
75 }
76 out->push_back(std::move(result_row));
77 }
78 return Status::OK();
79 }
80
81 } // namespace dataset
82 } // namespace mindspore
83