1 /**
2 * Copyright 2021-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/opt/pre/node_offload_pass.h"
17
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/engine/ir/datasetops/batch_node.h"
20 #include "minddata/dataset/engine/ir/datasetops/map_node.h"
21 #include "minddata/dataset/kernels/ir/tensor_operation.h"
22
23 namespace mindspore {
24 namespace dataset {
OffloadNodes()25 NodeOffloadPass::OffloadNodes::OffloadNodes() : auto_offload_(GlobalContext::config_manager()->get_auto_offload()) {}
26
27 // Perform MapNode offload check.
Visit(std::shared_ptr<MapNode> node,bool * const modified)28 Status NodeOffloadPass::OffloadNodes::Visit(std::shared_ptr<MapNode> node, bool *const modified) {
29 *modified = false;
30 ManualOffloadMode manual_offload = node->GetOffload();
31 bool offload_successful = false;
32 std::vector<std::string> input_columns = node->InputColumns();
33
34 // Check if the node is set to manually offload, or if auto_offload is enabled while manual offload is not False.
35 if ((manual_offload == ManualOffloadMode::kEnabled) ||
36 ((auto_offload_ == true) && (manual_offload != ManualOffloadMode::kDisabled))) {
37 bool offload_supported = true;
38 if (IS_OUTPUT_ON(mindspore::kInfo)) {
39 std::string operations = "operations=[";
40 auto op_list = node->operations();
41 std::for_each(op_list.begin(), op_list.end(), [&](const auto &op) {
42 op == op_list.back() ? operations += op->Name() : operations += op->Name() + ", ";
43 });
44 operations += "]";
45 MS_LOG(INFO) << "The offload of map(" + operations + ") is true, and heterogeneous acceleration will be enabled.";
46 }
47 // Currently offload not supported for different output_columns.
48 if (input_columns != node->OutputColumns()) {
49 MS_LOG(WARNING) << "Cannot offload map operation with output_columns != input_columns. Turning offload off.";
50 offload_supported = false;
51 }
52
53 // Check if map operation is at the end of the pipeline.
54 for (std::string input_column : input_columns) {
55 if (end_of_pipeline_.find(input_column) != end_of_pipeline_.end()) {
56 // The input column has already appeared in a previous map op.
57 if (end_of_pipeline_[input_column] == false) {
58 MS_LOG(WARNING) << "Map operation is not at the end of the pipeline for the following input column: "
59 << input_column << ". Turning offload off.";
60 offload_supported = false;
61 }
62 } else {
63 // First time seeing input column in a Map Node, add input column to map object.
64 end_of_pipeline_[input_column] = true;
65 }
66 }
67
68 if (offload_supported) {
69 std::vector<std::string> invalid_ops;
70 std::vector<std::shared_ptr<TensorOperation>> temp_operations = node->operations();
71 bool all_valid_ops = true;
72 int last_invalid_op_pos = 1;
73 int pos = 1;
74
75 // Check individual operations to see if they are supported by offload.
76 for (auto operation : temp_operations) {
77 std::string op_name = operation->Name();
78 if (supported_ops_.find(op_name) == supported_ops_.end()) {
79 last_invalid_op_pos = pos;
80 invalid_ops.push_back(op_name);
81 all_valid_ops = false;
82 }
83 pos++;
84 }
85
86 if (all_valid_ops) {
87 // All operations can be offloaded.
88 nodes_to_offload_.push_back(std::static_pointer_cast<DatasetNode>(node));
89 offload_successful = true;
90 } else {
91 // Some operation(s) cannot be offloaded.
92 MS_LOG(INFO)
93 << "In Map Node, offload is set to True, but offload is not supported by the following operation(s): "
94 << invalid_ops;
95
96 // See if the operations can be split into two Map Nodes
97 if (last_invalid_op_pos != static_cast<int>(temp_operations.size())) {
98 MS_LOG(INFO) << "Map operation will be split after " << invalid_ops.back()
99 << ", with the second map operation being offloaded.";
100 std::vector<std::shared_ptr<TensorOperation>> non_offload_ops(
101 temp_operations.begin(), (temp_operations.begin() + last_invalid_op_pos));
102 std::vector<std::shared_ptr<TensorOperation>> offload_ops((temp_operations.begin() + last_invalid_op_pos),
103 temp_operations.end());
104
105 // First set operations to offload_ops to prepare for copy
106 node->setOperations(offload_ops);
107 // Copy node (returns a copy of the node, but without children)
108 std::shared_ptr<DatasetNode> offload_node = node->Copy();
109 // Set the number of parallel workers of the new node to be the same as current one.
110 offload_node = offload_node->SetNumWorkers(node->NumWorkers());
111 node->setOperations(non_offload_ops);
112 // Insert the split offload map node above the original map node in the ir tree.
113 RETURN_IF_NOT_OK(node->InsertAbove(offload_node));
114 // Add the offload map node to nodes_to_offload
115 nodes_to_offload_.push_back(offload_node);
116 }
117 }
118 }
119 }
120 if (!offload_successful) {
121 // Offload of the original node without modification did not take place.
122 // Since map nodes are visited in reverse order, no other map ops for the input_column(s) can be offloaded after
123 // this.
124 for (std::string input_column : input_columns) {
125 end_of_pipeline_[input_column] = false;
126 }
127 }
128 return Status::OK();
129 }
130
131 // constructor
NodeOffloadPass()132 NodeOffloadPass::NodeOffloadPass() {}
133
134 // Walk the tree to collect the nodes to offload, fill the offload_json object, then remove the node.
RunOnTree(std::shared_ptr<DatasetNode> root_ir,bool * const modified)135 Status NodeOffloadPass::RunOnTree(std::shared_ptr<DatasetNode> root_ir, bool *const modified) {
136 MS_LOG(INFO) << "Pre pass: node offload pass started.";
137 // Create the offload node pass which can identify which nodes need to be offloaded.
138 std::unique_ptr<NodeOffloadPass::OffloadNodes> offload_nodes = std::make_unique<NodeOffloadPass::OffloadNodes>();
139 RETURN_IF_NOT_OK(offload_nodes->Run(root_ir, modified));
140
141 // Update modified flag if there were any nodes identified to be offloaded
142 if (offload_nodes->nodes_to_offload().empty() == false) {
143 *modified = true;
144 }
145
146 // Then, execute the offloading of any nodes that were set up to be offloaded
147 for (auto node : offload_nodes->nodes_to_offload()) {
148 RETURN_IF_NOT_OK(node->to_json(&offload_json_));
149 offload_json_["op_type"] = node->Name();
150
151 // Add the single offloaded node to the list of offloaded nodes and remove the node from the ir tree
152 offload_json_list_.push_back(offload_json_);
153 RETURN_IF_NOT_OK(node->Drop());
154 }
155 MS_LOG(INFO) << "Pre pass: offload node removal pass complete.";
156 return Status::OK();
157 }
158 } // namespace dataset
159 } // namespace mindspore
160