• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/opt/pre/node_offload_pass.h"
17 
18 #include "minddata/dataset/core/config_manager.h"
19 #include "minddata/dataset/engine/ir/datasetops/batch_node.h"
20 #include "minddata/dataset/engine/ir/datasetops/map_node.h"
21 #include "minddata/dataset/kernels/ir/tensor_operation.h"
22 
23 namespace mindspore {
24 namespace dataset {
OffloadNodes()25 NodeOffloadPass::OffloadNodes::OffloadNodes() : auto_offload_(GlobalContext::config_manager()->get_auto_offload()) {}
26 
27 // Perform MapNode offload check.
Visit(std::shared_ptr<MapNode> node,bool * const modified)28 Status NodeOffloadPass::OffloadNodes::Visit(std::shared_ptr<MapNode> node, bool *const modified) {
29   *modified = false;
30   ManualOffloadMode manual_offload = node->GetOffload();
31   bool offload_successful = false;
32   std::vector<std::string> input_columns = node->InputColumns();
33 
34   // Check if the node is set to manually offload, or if auto_offload is enabled while manual offload is not False.
35   if ((manual_offload == ManualOffloadMode::kEnabled) ||
36       ((auto_offload_ == true) && (manual_offload != ManualOffloadMode::kDisabled))) {
37     bool offload_supported = true;
38     if (IS_OUTPUT_ON(mindspore::kInfo)) {
39       std::string operations = "operations=[";
40       auto op_list = node->operations();
41       std::for_each(op_list.begin(), op_list.end(), [&](const auto &op) {
42         op == op_list.back() ? operations += op->Name() : operations += op->Name() + ", ";
43       });
44       operations += "]";
45       MS_LOG(INFO) << "The offload of map(" + operations + ") is true, and heterogeneous acceleration will be enabled.";
46     }
47     // Currently offload not supported for different output_columns.
48     if (input_columns != node->OutputColumns()) {
49       MS_LOG(WARNING) << "Cannot offload map operation with output_columns != input_columns. Turning offload off.";
50       offload_supported = false;
51     }
52 
53     // Check if map operation is at the end of the pipeline.
54     for (std::string input_column : input_columns) {
55       if (end_of_pipeline_.find(input_column) != end_of_pipeline_.end()) {
56         // The input column has already appeared in a previous map op.
57         if (end_of_pipeline_[input_column] == false) {
58           MS_LOG(WARNING) << "Map operation is not at the end of the pipeline for the following input column: "
59                           << input_column << ". Turning offload off.";
60           offload_supported = false;
61         }
62       } else {
63         // First time seeing input column in a Map Node, add input column to map object.
64         end_of_pipeline_[input_column] = true;
65       }
66     }
67 
68     if (offload_supported) {
69       std::vector<std::string> invalid_ops;
70       std::vector<std::shared_ptr<TensorOperation>> temp_operations = node->operations();
71       bool all_valid_ops = true;
72       int last_invalid_op_pos = 1;
73       int pos = 1;
74 
75       // Check individual operations to see if they are supported by offload.
76       for (auto operation : temp_operations) {
77         std::string op_name = operation->Name();
78         if (supported_ops_.find(op_name) == supported_ops_.end()) {
79           last_invalid_op_pos = pos;
80           invalid_ops.push_back(op_name);
81           all_valid_ops = false;
82         }
83         pos++;
84       }
85 
86       if (all_valid_ops) {
87         // All operations can be offloaded.
88         nodes_to_offload_.push_back(std::static_pointer_cast<DatasetNode>(node));
89         offload_successful = true;
90       } else {
91         // Some operation(s) cannot be offloaded.
92         MS_LOG(INFO)
93           << "In Map Node, offload is set to True, but offload is not supported by the following operation(s): "
94           << invalid_ops;
95 
96         // See if the operations can be split into two Map Nodes
97         if (last_invalid_op_pos != static_cast<int>(temp_operations.size())) {
98           MS_LOG(INFO) << "Map operation will be split after " << invalid_ops.back()
99                        << ", with the second map operation being offloaded.";
100           std::vector<std::shared_ptr<TensorOperation>> non_offload_ops(
101             temp_operations.begin(), (temp_operations.begin() + last_invalid_op_pos));
102           std::vector<std::shared_ptr<TensorOperation>> offload_ops((temp_operations.begin() + last_invalid_op_pos),
103                                                                     temp_operations.end());
104 
105           // First set operations to offload_ops to prepare for copy
106           node->setOperations(offload_ops);
107           // Copy node (returns a copy of the node, but without children)
108           std::shared_ptr<DatasetNode> offload_node = node->Copy();
109           // Set the number of parallel workers of the new node to be the same as current one.
110           offload_node = offload_node->SetNumWorkers(node->NumWorkers());
111           node->setOperations(non_offload_ops);
112           // Insert the split offload map node above the original map node in the ir tree.
113           RETURN_IF_NOT_OK(node->InsertAbove(offload_node));
114           // Add the offload map node to nodes_to_offload
115           nodes_to_offload_.push_back(offload_node);
116         }
117       }
118     }
119   }
120   if (!offload_successful) {
121     // Offload of the original node without modification did not take place.
122     // Since map nodes are visited in reverse order, no other map ops for the input_column(s) can be offloaded after
123     // this.
124     for (std::string input_column : input_columns) {
125       end_of_pipeline_[input_column] = false;
126     }
127   }
128   return Status::OK();
129 }
130 
131 // constructor
NodeOffloadPass()132 NodeOffloadPass::NodeOffloadPass() {}
133 
134 // Walk the tree to collect the nodes to offload, fill the offload_json object, then remove the node.
RunOnTree(std::shared_ptr<DatasetNode> root_ir,bool * const modified)135 Status NodeOffloadPass::RunOnTree(std::shared_ptr<DatasetNode> root_ir, bool *const modified) {
136   MS_LOG(INFO) << "Pre pass: node offload pass started.";
137   // Create the offload node pass which can identify which nodes need to be offloaded.
138   std::unique_ptr<NodeOffloadPass::OffloadNodes> offload_nodes = std::make_unique<NodeOffloadPass::OffloadNodes>();
139   RETURN_IF_NOT_OK(offload_nodes->Run(root_ir, modified));
140 
141   // Update modified flag if there were any nodes identified to be offloaded
142   if (offload_nodes->nodes_to_offload().empty() == false) {
143     *modified = true;
144   }
145 
146   // Then, execute the offloading of any nodes that were set up to be offloaded
147   for (auto node : offload_nodes->nodes_to_offload()) {
148     RETURN_IF_NOT_OK(node->to_json(&offload_json_));
149     offload_json_["op_type"] = node->Name();
150 
151     // Add the single offloaded node to the list of offloaded nodes and remove the node from the ir tree
152     offload_json_list_.push_back(offload_json_);
153     RETURN_IF_NOT_OK(node->Drop());
154   }
155   MS_LOG(INFO) << "Pre pass: offload node removal pass complete.";
156   return Status::OK();
157 }
158 }  // namespace dataset
159 }  // namespace mindspore
160