• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023-2024 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "plugin/device/ascend/hal/hardware/ge_graph_executor.h"
18 #include <tuple>
19 #include <functional>
20 #include <algorithm>
21 #include <utility>
22 #include <map>
23 #include <set>
24 #include <sstream>
25 #include "include/transform/graph_ir/types.h"
26 #include "include/transform/graph_ir/utils.h"
27 #include "include/common/utils/utils.h"
28 #include "include/common/debug/draw.h"
29 #include "include/common/debug/anf_ir_dump.h"
30 #include "include/common/utils/scoped_long_running.h"
31 #include "abstract/abstract_value.h"
32 #include "include/backend/kernel_graph.h"
33 #include "plugin/device/cpu/hal/device/cpu_device_address.h"
34 #include "runtime/device/ms_device_shape_transfer.h"
35 #include "runtime/device/device_address_utils.h"
36 #include "plugin/device/cpu/hal/device/cpu_memory_manager.h"
37 #include "plugin/device/ascend/hal/hccl_adapter/hccl_adapter.h"
38 #include "plugin/device/ascend/optimizer/ge_optimization.h"
39 #include "plugin/device/ascend/hal/common/ascend_utils.h"
40 #include "plugin/device/ascend/hal/hardware/ge_device_res_manager.h"
41 #include "plugin/device/ascend/hal/hardware/ge_utils.h"
42 #include "plugin/device/ascend/hal/device/ascend_memory_adapter.h"
43 #include "plugin/device/ascend/hal/device/ascend_device_address.h"
44 #include "plugin/device/ascend/hal/hardware/ge_graph_optimization.h"
45 #include "plugin/device/ascend/hal/device/ascend_device_synchronizer.h"
46 #include "include/backend/debug/profiler/profiling.h"
47 #include "include/backend/mem_reuse/mem_tracker.h"
48 #include "ge/ge_graph_compile_summary.h"
49 #include "kernel/kernel_build_info.h"
50 #include "op_proto/inc/array_ops.h"
51 #include "ops/nn_op_name.h"
52 #include "ops/array_ops.h"
53 #include "pybind_api/gil_scoped_long_running.h"
54 #include "include/common/utils/compile_cache_context.h"
55 using InputNameAndType = std::vector<std::pair<std::string, bool>>;
56 using Data = ::ge::op::Data;
57 using RefData = ::ge::op::RefData;
58 
59 namespace mindspore {
60 namespace device {
61 namespace ascend {
62 namespace {
63 const std::set<std::string> kIgnoreGEShapeOps = {kSoftMarginLossOpName};
64 mindspore::HashMap<std::string, size_t> feature_memorys;
65 mindspore::HashMap<std::string, size_t> streams;
66 constexpr size_t kNeedRecycleOutput = 5;
67 constexpr int kCollectHostInfoStart = 0;
68 constexpr int kCollectHostInfoEnd = 1;
69 
GetMeRetDataType(const AbstractBasePtr & cnode_data,std::vector<TypeId> * me_types)70 void GetMeRetDataType(const AbstractBasePtr &cnode_data, std::vector<TypeId> *me_types) {
71   MS_EXCEPTION_IF_NULL(cnode_data);
72 
73   if (cnode_data->isa<abstract::AbstractNone>()) {
74     return;
75   }
76 
77   if (cnode_data->isa<abstract::AbstractTensor>()) {
78     TypeId me_type = cnode_data->BuildType()->type_id();
79     if (me_type == kObjectTypeTensorType) {
80       me_type = dyn_cast<TensorType>(cnode_data->BuildType())->element()->type_id();
81       (void)me_types->emplace_back(me_type);
82     }
83     return;
84   }
85   if (cnode_data->isa<abstract::AbstractScalar>()) {
86     TypeId me_type = cnode_data->BuildType()->type_id();
87     (void)me_types->emplace_back(me_type);
88     return;
89   }
90   auto abstract_tuple = cnode_data->cast<abstract::AbstractTuplePtr>();
91   MS_EXCEPTION_IF_NULL(abstract_tuple);
92   auto elements = abstract_tuple->elements();
93   for (size_t i = 0; i < abstract_tuple->size(); ++i) {
94     GetMeRetDataType(elements[i], me_types);
95   }
96 }
97 
GetDefaultParams(const FuncGraphPtr & anf_graph,std::map<std::string,ShapeVector> * origin_shape)98 transform::TensorOrderMap GetDefaultParams(const FuncGraphPtr &anf_graph,
99                                            std::map<std::string, ShapeVector> *origin_shape) {
100   MS_EXCEPTION_IF_NULL(anf_graph);
101   transform::TensorOrderMap res;
102   for (auto &anf_node : anf_graph->parameters()) {
103     MS_EXCEPTION_IF_NULL(anf_node);
104     auto para = anf_node->cast<ParameterPtr>();
105     MS_EXCEPTION_IF_NULL(para);
106     if (para->has_default()) {
107       auto value = para->default_param();
108       MS_EXCEPTION_IF_NULL(value);
109       auto tensor = value->cast<std::shared_ptr<tensor::Tensor>>();
110       MS_EXCEPTION_IF_NULL(tensor);
111       origin_shape->emplace(para->name(), tensor->shape_c());
112       // need ref shape when auto parallel
113       auto build_shape = para->abstract()->BuildShape();
114       if (build_shape != nullptr) {
115         (void)tensor->MetaTensor::set_shape(build_shape->cast<abstract::ShapePtr>()->shape());
116         MS_LOG(INFO) << "ref abstract Parameter: " << para->name() << ", tensor: " << tensor->ToString();
117       }
118       res.emplace(para->name(), tensor);
119       MS_LOG(DEBUG) << "Parameter " << para->name() << " has default value.";
120     }
121   }
122   return res;
123 }
124 
RevertOriginShape(const KernelGraphPtr & anf_graph,const std::map<std::string,ShapeVector> & origin_shape)125 void RevertOriginShape(const KernelGraphPtr &anf_graph, const std::map<std::string, ShapeVector> &origin_shape) {
126   MS_EXCEPTION_IF_NULL(anf_graph);
127   transform::TensorOrderMap res;
128   for (auto &anf_node : anf_graph->parameters()) {
129     MS_EXCEPTION_IF_NULL(anf_node);
130     auto para = anf_node->cast<ParameterPtr>();
131     MS_EXCEPTION_IF_NULL(para);
132     if (para->has_default()) {
133       auto it = origin_shape.find(para->name());
134       if (it == origin_shape.end()) {
135         MS_LOG(ERROR) << "Failed to find input " << para->name() << " in input_shape " << origin_shape;
136         continue;
137       }
138       auto value = para->default_param();
139       MS_EXCEPTION_IF_NULL(value);
140       auto tensor = value->cast<std::shared_ptr<tensor::Tensor>>();
141       (void)tensor->MetaTensor::set_shape(it->second);
142       MS_LOG(INFO) << "ref abstract Parameter: " << para->name() << ", tensor: " << tensor->ToString();
143     }
144   }
145 }
146 
GetInputTensors(const FuncGraphPtr & anf_graph)147 std::vector<transform::GeTensorPtr> GetInputTensors(const FuncGraphPtr &anf_graph) {
148   MS_EXCEPTION_IF_NULL(anf_graph);
149   transform::TensorOrderMap init_input_map;
150   std::vector<tensor::TensorPtr> init_input;
151   for (auto &anf_node : anf_graph->parameters()) {
152     MS_EXCEPTION_IF_NULL(anf_node);
153     auto para = anf_node->cast<ParameterPtr>();
154     MS_EXCEPTION_IF_NULL(para);
155     if (para->has_default()) {
156       auto value = para->default_param();
157       MS_EXCEPTION_IF_NULL(value);
158       (void)init_input_map.emplace(para->name(), value->cast<std::shared_ptr<tensor::Tensor>>());
159     }
160   }
161   (void)std::transform(init_input_map.begin(), init_input_map.end(), std::back_inserter(init_input),
162                        [](const std::pair<std::string, tensor::TensorPtr> &item) { return item.second; });
163   return transform::ConvertInputTensors(init_input, kOpFormat_NCHW);
164 }
165 
RunGEInitGraph(const FuncGraphPtr & anf_graph)166 void RunGEInitGraph(const FuncGraphPtr &anf_graph) {
167   MS_LOG(DEBUG) << "ExecInitGraph start.";
168   MS_EXCEPTION_IF_NULL(anf_graph);
169 
170   transform::RunOptions run_options;
171   run_options.name = "init_subgraph." + anf_graph->ToString();
172 
173   auto graph_runner = transform::CheckAndGetGraphRunner(run_options);
174   if (graph_runner == nullptr) {
175     return;
176   }
177 
178   std::vector<transform::GeTensorPtr> ge_tensors;
179   std::vector<transform::GeTensorPtr> ge_outputs;
180   {
181     // Release GIL before calling into (potentially long-running) C++ code
182     GilReleaseWithCheck gil_release;
183     transform::Status ret = transform::RunGraph(graph_runner, run_options, ge_tensors, &ge_outputs);
184     if (ret != transform::Status::SUCCESS) {
185       MS_LOG(EXCEPTION) << "Exec " << run_options.name << " graph failed.";
186     }
187 
188     MS_LOG(DEBUG) << "Exec " << run_options.name << " graph success.";
189 
190     if ((ConfigManager::GetInstance().parallel_strategy() == ParallelStrategy::DISTRIBUTION) &&
191         (transform::GetGraphByName(BROADCAST_GRAPH_NAME) != nullptr)) {
192       run_options.name = BROADCAST_GRAPH_NAME;
193       ge_tensors = GetInputTensors(anf_graph);
194       ret = transform::RunGraph(graph_runner, run_options, ge_tensors, &ge_outputs);
195       if (ret != transform::Status::SUCCESS) {
196         MS_LOG(EXCEPTION) << "Exec BROADCAST_GRAPH_NAME failed.";
197       }
198       MS_LOG(DEBUG) << "Exec broadcast graph success.";
199     }
200   }
201 }
202 
UpdateOutputNodeShape(const AnfNodePtr & node,size_t index,TypeId output_type,const ShapeVector & output_shape)203 void UpdateOutputNodeShape(const AnfNodePtr &node, size_t index, TypeId output_type, const ShapeVector &output_shape) {
204   MS_EXCEPTION_IF_NULL(node);
205   std::string name;
206   if (node->isa<CNode>()) {
207     name = common::AnfAlgo::GetCNodeName(node);
208   }
209   size_t total_output_num = AnfAlgo::GetOutputElementNum(node);
210   if (index >= total_output_num) {
211     MS_LOG_WITH_NODE(EXCEPTION, node) << "Invalid output index " << index << ", node " << node->fullname_with_scope()
212                                       << " has " << total_output_num << " outputs.";
213   }
214   std::vector<TypeId> types = {};
215   std::vector<ShapeVector> shapes = {};
216   for (size_t i = 0; i < total_output_num; ++i) {
217     if (i == index && kIgnoreGEShapeOps.count(name) == 0) {
218       types.push_back(output_type);
219       shapes.push_back(output_shape);
220     } else {
221       types.push_back(common::AnfAlgo::GetOutputInferDataType(node, i));
222       (void)shapes.emplace_back(common::AnfAlgo::GetOutputInferShape(node, i));
223     }
224   }
225   common::AnfAlgo::SetOutputInferTypeAndShape(types, shapes, node.get());
226 }
227 
SetDynamicShapeAttr(const KernelGraphPtr & kernel_graph)228 void SetDynamicShapeAttr(const KernelGraphPtr &kernel_graph) {
229   MS_EXCEPTION_IF_NULL(kernel_graph);
230   auto nodes = TopoSort(kernel_graph->output());
231   for (auto &node : nodes) {
232     if (common::AnfAlgo::IsDynamicShape(node)) {
233       MS_LOG(DEBUG) << "Set Dynamic Shape Attr to Node : " << node->fullname_with_scope();
234       kernel_graph->SetGraphDynamicAttr(true);
235       return;
236     }
237   }
238 }
239 
EnableGraphInputZeroCopy(const KernelGraphPtr & graph)240 void EnableGraphInputZeroCopy(const KernelGraphPtr &graph) {
241   MS_EXCEPTION_IF_NULL(graph);
242   // Zero copy is only enabled for PyNative and Subgraph sink.
243   if ((!graph->has_flag(kFlagPyNativeRunInGraph) && !graph->has_flag(kFlagEnableZeroCopyInGraph)) ||
244       !graph->is_graph_run_mode()) {
245     return;
246   }
247   const auto &input_nodes = graph->input_nodes();
248   for (const auto &input : input_nodes) {
249     MS_EXCEPTION_IF_NULL(input);
250     if (AnfAlgo::OutputAddrExist(input, 0)) {
251       auto input_address = AnfAlgo::GetMutableOutputAddr(input, 0, false);
252       MS_EXCEPTION_IF_NULL(input_address);
253       input_address->set_is_ptr_persisted(false);
254       input_address->ClearFlag(device::kDeviceAddressFlagNotUsed);
255       MS_LOG(INFO) << "Enable zero copy for input " << input->DebugString();
256     }
257   }
258 }
259 
EnableGraphOutputZeroCopy(const KernelGraphPtr & graph)260 void EnableGraphOutputZeroCopy(const KernelGraphPtr &graph) {
261   MS_LOG(DEBUG) << "EnableGraphOutputZeroCopy start";
262   MS_EXCEPTION_IF_NULL(graph);
263   if ((!graph->has_flag(kFlagEnableZeroCopyInGraph)) || !graph->is_graph_run_mode()) {
264     MS_LOG(DEBUG) << "EnableGraphOutputZeroCopy start return";
265     return;
266   }
267   // Zero copy is only enabled for subgraph sink.
268   auto outputs = common::AnfAlgo::GetAllOutputWithIndex(graph->output());
269   for (const auto &output : outputs) {
270     const auto &node_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
271     const auto &node = node_with_index.first;
272     const auto &index = node_with_index.second;
273     MS_EXCEPTION_IF_NULL(node);
274     MS_LOG(DEBUG) << "EnableGraphOutputZeroCopy check node:" << node->DebugString();
275     if (node->isa<CNode>() && AnfAlgo::OutputAddrExist(node, index)) {
276       auto device_address = AnfAlgo::GetMutableOutputAddr(node, index, false);
277       MS_EXCEPTION_IF_NULL(device_address);
278       device_address->set_is_ptr_persisted(false);
279       MS_LOG(DEBUG) << "Disable ptr persisted in output node:" << node->DebugString() << " index:" << index
280                     << " address:" << device_address << " for graph:" << graph->ToString();
281     }
282   }
283 }
284 
285 struct GraphSummary {
286   size_t const_memory_size = 0;
287   size_t feature_memory_size = 0;
288   bool is_feature_memory_refreshable = false;
289   size_t stream_num = 0;
290   size_t event_num = 0;
291   std::vector<ShapeVector> output_shapes = {};
292   std::vector<ge::DataType> output_dtypes = {};
293   // pair<input_index, output_index>
294   std::vector<std::pair<uint32_t, uint32_t>> io_indexes;
295   bool is_static = false;
296 
297   GraphSummary() = default;
GraphSummarymindspore::device::ascend::__anon360b53500111::GraphSummary298   explicit GraphSummary(const ::ge::CompiledGraphSummaryPtr &graph_summary) {
299     MS_EXCEPTION_IF_NULL(graph_summary);
300     is_static = graph_summary->IsStatic();
301     if (is_static) {
302       ::ge::graphStatus status;
303       status = graph_summary->GetConstMemorySize(const_memory_size);
304       if (status != ::ge::GRAPH_SUCCESS) {
305         MS_LOG(EXCEPTION) << "GetConstMemorySize failed, status = " << status;
306       }
307       status = graph_summary->GetFeatureMemorySize(feature_memory_size);
308       if (status != ::ge::GRAPH_SUCCESS) {
309         MS_LOG(EXCEPTION) << "GetFeatureMemorySize failed, status = " << status;
310       }
311       status = graph_summary->GetFeatureMemoryBaseRefreshable(is_feature_memory_refreshable);
312       if (status != ::ge::GRAPH_SUCCESS) {
313         MS_LOG(EXCEPTION) << "GetFeatureMemoryBaseRefreshable failed, status = " << status;
314       }
315       status = graph_summary->GetStreamNum(stream_num);
316       if (status != ::ge::GRAPH_SUCCESS) {
317         MS_LOG(EXCEPTION) << "GetStreamNum failed, status = " << status;
318       }
319       status = graph_summary->GetEventNum(event_num);
320       if (status != ::ge::GRAPH_SUCCESS) {
321         MS_LOG(EXCEPTION) << "GetEventNum failed, status = " << status;
322       }
323       std::vector<::ge::Shape> ge_shapes;
324       status = graph_summary->GetOutputShapes(ge_shapes);
325       if (status != ::ge::GRAPH_SUCCESS) {
326         MS_LOG(EXCEPTION) << "GetOutputShapes failed, status = " << status;
327       }
328       (void)std::transform(ge_shapes.begin(), ge_shapes.end(), std::back_inserter(output_shapes),
329                            [](const ::ge::Shape &ge_shape) -> ShapeVector { return ge_shape.GetDims(); });
330       if (graph_summary->GetOutputDtypes(output_dtypes) != ::ge::GRAPH_SUCCESS) {
331         MS_LOG(EXCEPTION) << "GetOutputDtypes failed, status = " << status
332                           << ", maybe the execution mode is not as expected.";
333       }
334       if (graph_summary->GetIOIndexesWithSameAddr(io_indexes) != ::ge::GRAPH_SUCCESS) {
335         MS_LOG(EXCEPTION) << "GetIOIndexesWithSameAddr failed, status = " << status
336                           << ", maybe the execution mode is not as expected.";
337       }
338     } else {
339       MS_LOG(WARNING) << "Graph is not static, maybe the execution mode is not as expected.";
340     }
341   }
342 
ToStringmindspore::device::ascend::__anon360b53500111::GraphSummary343   std::string ToString() const {
344     std::stringstream ss;
345     ss << "const_memory_size[" << const_memory_size << "], feature_memory_size[" << feature_memory_size
346        << "], is_feature_memory_refreshable[" << is_feature_memory_refreshable << "], stream_num[" << stream_num
347        << "], event_num[" << event_num << "], output size[" << output_shapes.size() << "], is_static[" << is_static
348        << "]";
349     if (!output_shapes.empty()) {
350       if (output_shapes.size() != output_dtypes.size()) {
351         MS_LOG(WARNING) << "The output_dtypes size in summary is not equal to output_shapes size.";
352       }
353       for (size_t i = 0; i < output_shapes.size(); ++i) {
354         std::string shape_str = "[";
355         std::string dtype_str = "";
356         for (size_t j = 0; j < output_shapes[i].size(); ++j) {
357           if (j != output_shapes[i].size() - 1) {
358             shape_str += std::to_string(output_shapes[i][j]) + ",";
359           } else {
360             shape_str += std::to_string(output_shapes[i][j]) + "]";
361           }
362         }
363 
364         if (output_shapes[i].empty()) {
365           shape_str = "[]";
366         }
367         if (i < output_dtypes.size()) {
368           dtype_str += "[";
369           dtype_str += TransGeDtypeToString(output_dtypes[i]);
370           dtype_str += "]";
371         }
372         if (dtype_str.empty()) {
373           ss << ", output[" << i << "] shape = " << shape_str;
374         } else {
375           ss << ", output[" << i << "] shape = " << shape_str << " dtype = " << dtype_str;
376         }
377       }
378     }
379     if (!io_indexes.empty()) {
380       std::string io_indexes_str = "[";
381       for (auto io_index : io_indexes) {
382         io_indexes_str += "[" + std::to_string(io_index.first) + "," + std::to_string(io_index.second) + "]";
383       }
384       io_indexes_str += "]";
385       ss << ", io_indexes: " << io_indexes_str;
386     }
387 
388     return ss.str();
389   }
390 
391  private:
TransGeDtypeToStringmindspore::device::ascend::__anon360b53500111::GraphSummary392   std::string TransGeDtypeToString(const transform::GeDataType dtype) const {
393     std::string dtype_str = "";
394     if (transform::ge_dtype_str_map.find(dtype) != transform::ge_dtype_str_map.end()) {
395       dtype_str = transform::ge_dtype_str_map[dtype];
396     }
397     return dtype_str;
398   }
399 };
400 
FilterAllParameters(const KernelGraphPtr & kernel_graph)401 std::multimap<std::string, ParameterPtr> FilterAllParameters(const KernelGraphPtr &kernel_graph) {
402   MS_EXCEPTION_IF_NULL(kernel_graph);
403   std::multimap<std::string, ParameterPtr> ret;
404   std::vector<AnfNodePtr> todo = kernel_graph->input_nodes();
405   (void)todo.insert(todo.end(), kernel_graph->child_graph_result().begin(), kernel_graph->child_graph_result().end());
406   for (const auto &node : todo) {
407     MS_EXCEPTION_IF_NULL(node);
408     if (!node->isa<Parameter>()) {
409       continue;
410     }
411     auto parameter = node->cast<ParameterPtr>();
412     MS_EXCEPTION_IF_NULL(parameter);
413     std::string name = parameter->name();
414     (void)ret.emplace(name, parameter);
415   }
416   return ret;
417 }
418 
SetParameterKernelInfo(const AnfNodePtr & node,const std::shared_ptr<device::KernelInfo> & kernel_info)419 void SetParameterKernelInfo(const AnfNodePtr &node, const std::shared_ptr<device::KernelInfo> &kernel_info) {
420   MS_EXCEPTION_IF_NULL(kernel_info);
421   auto build_info = kernel_info->GetMutableSelectKernelBuildInfo();
422   if (!build_info) {
423     MS_LOG(ERROR) << "Parameter doesn't have build info: " << node->DebugString()
424                   << ", full name: " << node->fullname_with_scope();
425     return;
426   }
427   std::vector<TypeId> refresh_output_types = {common::AnfAlgo::GetOutputInferDataType(node, 0)};
428   build_info->SetOutputsDeviceType(refresh_output_types);
429 }
430 
SetKernelInfo(const AnfNodePtr & node)431 void SetKernelInfo(const AnfNodePtr &node) {
432   MS_EXCEPTION_IF_NULL(node);
433   // If kernel build info has been set up. skip
434   std::shared_ptr<device::KernelInfo> kernel_info =
435     std::dynamic_pointer_cast<device::KernelInfo>(node->kernel_info_ptr());
436   if (utils::isa<ParameterPtr>(node)) {
437     SetParameterKernelInfo(node, kernel_info);
438     return;
439   }
440 
441   if (!kernel_info) {
442     kernel_info = std::make_shared<device::KernelInfo>();
443     MS_EXCEPTION_IF_NULL(kernel_info);
444     node->set_kernel_info(kernel_info);
445   }
446 
447   auto build_info = kernel_info->GetMutableSelectKernelBuildInfo();
448   if (!build_info) {
449     auto builder = std::make_shared<kernel::KernelBuildInfo::KernelBuildInfoBuilder>();
450     MS_EXCEPTION_IF_NULL(builder);
451     build_info = builder->Build();
452   }
453 
454   const auto &output_with_indexs = common::AnfAlgo::GetAllOutputWithIndex(node);
455   std::vector<TypeId> output_infer_types;
456   std::vector<std::string> output_formats;
457   for (const auto &output_with_index : output_with_indexs) {
458     (void)output_infer_types.emplace_back(
459       common::AnfAlgo::GetOutputInferDataType(output_with_index.first, output_with_index.second));
460     (void)output_formats.emplace_back(kOpFormat_DEFAULT);
461   }
462   build_info->SetOutputsDeviceType(output_infer_types);
463   build_info->SetOutputsFormat(output_formats);
464   kernel_info->set_select_kernel_build_info(build_info);
465 }
466 
BuildFakeGraph(const FuncGraphPtr & anf_graph)467 bool BuildFakeGraph(const FuncGraphPtr &anf_graph) {
468   MS_EXCEPTION_IF_NULL(anf_graph);
469 #ifdef ENABLE_DUMP_IR
470   auto context = MsContext::GetInstance();
471   MS_EXCEPTION_IF_NULL(context);
472   if (context->CanDump(kIntroductory)) {
473     if (context->CanDump(kFully)) {
474       draw::Draw("anf_graph_before_build_df_graph.dot", anf_graph);  // for debug
475     }
476     DumpIR("anf_graph_before_build_df_graph.ir", anf_graph, true, kWholeStack);
477   }
478 #endif
479   (void)setenv("GE_TRAIN", IsGeTrain() ? "1" : "0", 1);
480   if (!AddFakeGraph(anf_graph)) {
481     MS_LOG(ERROR) << "Add fake graph failed";
482     return false;
483   }
484 #ifdef ENABLE_DUMP_IR
485   if (context->CanDump(kIntroductory)) {
486     if (context->CanDump(kFully)) {
487       draw::Draw("anf_graph_after_build_df_graph.dot", anf_graph);  // for debug
488     }
489     DumpIR("anf_graph_after_build_df_graph.ir", anf_graph, true, kWholeStack);
490   }
491 #endif
492   return true;
493 }
494 
ClearForwardOutputAddress(const KernelGraphPtr & graph,const DeviceContext * device_context)495 void ClearForwardOutputAddress(const KernelGraphPtr &graph, const DeviceContext *device_context) {
496   MS_EXCEPTION_IF_NULL(graph);
497   if (!graph->has_flag(kFlagPyNativeRunInGraph)) {
498     return;
499   }
500   const auto &input_nodes = graph->input_nodes();
501   for (const auto &input : input_nodes) {
502     MS_EXCEPTION_IF_NULL(input);
503     auto parameter = input->cast<ParameterPtr>();
504     if (parameter != nullptr) {
505       if (parameter->has_user_data(kForwardOutput)) {
506         auto device_address = AnfAlgo::GetMutableOutputAddr(parameter, 0);
507         auto new_address = runtime::DeviceAddressUtils::CloneEmptyDeviceAddress(device_address, device_context);
508         AnfAlgo::SetOutputAddr(new_address, 0, parameter.get());
509         MS_LOG(DEBUG) << "Clear old address " << device_address.get() << " and set new address " << new_address.get()
510                       << " to parameter " << parameter->name();
511       }
512     }
513   }
514 }
515 
516 class ContextReset {
517  public:
ContextReset(DeviceContext * device_context)518   explicit ContextReset(DeviceContext *device_context) : device_context_(device_context) {}
~ContextReset()519   ~ContextReset() {
520     if (device_context_ != nullptr && device_context_->device_res_manager_ != nullptr) {
521       device_context_->device_res_manager_->BindDeviceToCurrentThread(true);
522     }
523   }
524 
525  private:
526   DeviceContext *device_context_;
527 };
528 
UpdateTracker(const std::string & task_name,const std::string & node_name,const std::string & graph_str,size_t size,void * device_ptr,device::tracker::MemType mem_type)529 void UpdateTracker(const std::string &task_name, const std::string &node_name, const std::string &graph_str,
530                    size_t size, void *device_ptr, device::tracker::MemType mem_type) {
531   device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, task_name, node_name, graph_str);
532   device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddCompileTimeMemInfo, task_name, size, device_ptr, mem_type);
533 }
534 
UpdateFMTracker(size_t feature_memory_size,const std::string & graph_name)535 void UpdateFMTracker(size_t feature_memory_size, const std::string &graph_name) {
536   device::tracker::CALL_MEMORY_TRACKER(AllocMemBlock, 0, feature_memory_size, "Ascend",
537                                        AscendMemAdapter::GetInstance().GetActualPeakMemory(), 0, 0, 0);
538   device::tracker::CALL_MEMORY_TRACKER(FreeMemBlock, 0, 0, 0);
539   device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, "RunGeGraph", "RunGeGraph", graph_name);
540   device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddCompileTimeMemInfo, "RunGeGraph", feature_memory_size, 0,
541                                                  device::tracker::MemType::kGeFeatureMemory);
542 }
543 
CacheFileExists(const std::string & name)544 bool CacheFileExists(const std::string &name) {
545   auto &compile_cache_context = CompileCacheContext::GetInstance();
546   auto dep_files_hash = compile_cache_context.CompileCacheDepFilesHash();
547   auto ge_graph_key = name;
548   if (!dep_files_hash.empty()) {
549     ge_graph_key = dep_files_hash + "_" + ge_graph_key;
550   }
551   auto ge_cache_path = Common::GetCompilerCachePath() + kGeCache;
552   ge_graph_key = NormalizeString(ge_graph_key);
553   auto cache_idx_file = ge_cache_path + "/" + ge_graph_key + ".idx";
554   struct stat buffer;
555   bool ret = stat(cache_idx_file.c_str(), &buffer) == 0;
556   MS_LOG(INFO) << "Cached index file name: " << cache_idx_file << " exists: " << ret;
557   return ret;
558 }
559 
560 }  // namespace
561 
AllocInputHostMemory(const KernelGraphPtr & kernel_graph) const562 void GeGraphExecutor::AllocInputHostMemory(const KernelGraphPtr &kernel_graph) const {
563   MS_EXCEPTION_IF_NULL(kernel_graph);
564   const auto &inputs = kernel_graph->inputs();
565   auto device_id = device_context_->device_context_key().device_id_;
566   for (const auto &input : inputs) {
567     auto builder = std::make_shared<kernel::KernelBuildInfo::KernelBuildInfoBuilder>();
568     builder->SetOutputsFormat({kOpFormat_DEFAULT});
569     std::vector<TypeId> output_type = {common::AnfAlgo::GetOutputInferDataType(input, 0)};
570     builder->SetOutputsDeviceType(output_type);
571     AnfAlgo::SetSelectKernelBuildInfo(builder->Build(), input.get());
572   }
573 
574   for (const auto &input_node : inputs) {
575     if (!input_node->isa<Parameter>()) {
576       MS_LOG(DEBUG) << input_node->fullname_with_scope() << " is not parameter, continue";
577       continue;
578     }
579     TypeId output_type_id = common::AnfAlgo::GetOutputInferDataType(input_node, 0);
580 
581     size_t tensor_size;
582     if (kernel_graph->is_dynamic_shape()) {
583       tensor_size = 0;
584     } else {
585       std::vector<size_t> shape = Convert2SizeT(common::AnfAlgo::GetOutputInferShape(input_node, 0));
586       size_t type_size = GetTypeByte(TypeIdToType(output_type_id));
587       tensor_size = std::accumulate(shape.begin(), shape.end(), type_size, std::multiplies<size_t>());
588     }
589 
590     auto input_with_index = std::make_pair(input_node, 0);
591     const auto kernel_tensor = AnfAlgo::CreateOutputKernelTensorWithDeviceInfo(
592       input_with_index, nullptr, tensor_size, kOpFormat_DEFAULT, output_type_id, {}, kAscendDevice, device_id);
593     auto device_address_ptr = std::make_shared<GeHostAddress>(kernel_tensor);
594     device_address_ptr->set_is_ptr_persisted(false);
595     AnfAlgo::SetOutputAddr(device_address_ptr, 0, input_node.get());
596   }
597 }
598 
AllocOutputHostMemory(const KernelGraphPtr & kernel_graph) const599 void GeGraphExecutor::AllocOutputHostMemory(const KernelGraphPtr &kernel_graph) const {
600   MS_EXCEPTION_IF_NULL(kernel_graph);
601   auto outputs = common::AnfAlgo::GetAllOutputWithIndex(kernel_graph->output());
602   auto device_id = device_context_->device_context_key().device_id_;
603   for (const auto &output : outputs) {
604     const auto &output_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
605     auto &output_node = output_with_index.first;
606     MS_EXCEPTION_IF_NULL(output_node);
607     SetKernelInfo(output_node);
608 
609     // Parameter's memory is allocated earlier, and there is no need to reallocate memory if Parameter is output.
610     if (output_node->isa<Parameter>()) {
611       continue;
612     }
613 
614     auto i = output_with_index.second;
615     TypeId output_type_id = common::AnfAlgo::GetOutputInferDataType(output_node, i);
616     const auto kernel_tensor = AnfAlgo::CreateOutputKernelTensorWithDeviceInfo(
617       output_with_index, nullptr, 0, kOpFormat_DEFAULT, output_type_id, {}, kAscendDevice, device_id);
618     auto output_device_addr = std::make_shared<GeHostAddress>(kernel_tensor);
619     AnfAlgo::SetOutputAddr(output_device_addr, i, output_node.get());
620 
621     if (common::AnfAlgo::IsNopNode(output_node)) {
622       auto [real_node, real_idx] = common::AnfAlgo::GetPrevNodeOutput(output_node, i, true);
623       if (real_node != output_node || real_idx != i) {
624         // set output addr size if the input node is output.
625         const auto &inputs = kernel_graph->inputs();
626         if (std::any_of(inputs.begin(), inputs.end(),
627                         [&real_node](const AnfNodePtr &input_node) { return real_node == input_node; })) {
628           auto real_node_addr = AnfAlgo::GetMutableOutputAddr(real_node, real_idx);
629           output_device_addr->SetSize(real_node_addr->GetSize());
630         }
631         AnfAlgo::SetOutputAddr(output_device_addr, real_idx, real_node.get());
632       }
633     }
634   }
635 }
636 
AllocConstMemory(const transform::RunOptions & options,const KernelGraphPtr & graph,size_t memory_size) const637 void GeGraphExecutor::AllocConstMemory(const transform::RunOptions &options, const KernelGraphPtr &graph,
638                                        size_t memory_size) const {
639   if (memory_size == 0) {
640     return;
641   }
642   MS_LOG(INFO) << "Start AllocConstMemory, memory_size: " << memory_size;
643   auto memory = ResManager()->AllocateMemory(memory_size);
644   if (memory == nullptr) {
645     MS_LOG(EXCEPTION) << "Allocate memory failed, memory size:" << memory_size << ", graph: " << graph->ToString();
646   }
647   if (common::IsNeedProfileMemory()) {
648     MS_LOG(WARNING) << "Need Profile Memory, alloc type: ConstMemory, size: " << memory_size
649                     << ", graph: " << graph->ToString() << ", device address addr: " << memory;
650   }
651   UpdateTracker("AllocConstMemory", "ConstMemory", graph->ToString(), memory_size, memory,
652                 device::tracker::MemType::kGeConst);
653   auto graph_runner = transform::GetGraphRunner();
654   MS_EXCEPTION_IF_NULL(graph_runner);
655   auto ret = graph_runner->SetConstMemory(options, memory, memory_size);
656   if (ret != transform::Status::SUCCESS) {
657     MS_LOG(EXCEPTION) << "SetConstMemory for graph " << options.name << " failed.";
658   }
659   MS_LOG(INFO) << "End AllocConstMemory";
660 }
661 
AllocFeatureMemory(const transform::RunOptions & options,size_t memory_size) const662 void GeGraphExecutor::AllocFeatureMemory(const transform::RunOptions &options, size_t memory_size) const {
663   if (memory_size == 0) {
664     return;
665   }
666   MS_LOG(INFO) << "Start AllocFeatureMemory, memory_size: " << memory_size;
667   auto memory_manager = ResManager()->mem_manager_;
668   MS_EXCEPTION_IF_NULL(memory_manager);
669   memory_manager->ResetDynamicMemory();
670   auto memory = memory_manager->MallocWorkSpaceMem(memory_size);
671   if (memory == nullptr) {
672     MS_LOG(EXCEPTION) << "AllocFeatureMemory error, memory not enough, memory size: " << memory_size;
673   }
674   auto graph_runner = transform::GetGraphRunner();
675   MS_EXCEPTION_IF_NULL(graph_runner);
676   auto ret = graph_runner->UpdateFeatureMemory(options, memory, memory_size);
677   if (ret != transform::Status::SUCCESS) {
678     MS_LOG(EXCEPTION) << "UpdateFeatureMemory for graph " << options.name << " failed.";
679   }
680   memory_manager->ResetDynamicMemory();
681   MS_LOG(INFO) << "End AllocFeatureMemory";
682 }
683 
AllocParameterMemory(const KernelGraphPtr & kernel_graph,std::set<KernelGraphPtr> * memo) const684 void GeGraphExecutor::AllocParameterMemory(const KernelGraphPtr &kernel_graph, std::set<KernelGraphPtr> *memo) const {
685   // Set Device Type to be same as Host Type, AssignStaticMemoryInput will ignore parameters without DeviceType
686   MS_EXCEPTION_IF_NULL(kernel_graph);
687   if (memo == nullptr) {
688     MS_LOG(INFO) << "Start AllocParameterMemory, kernel graph: " << kernel_graph->ToString();
689     std::set<KernelGraphPtr> memo_set;
690     AllocParameterMemory(kernel_graph, &memo_set);
691     MS_LOG(INFO) << "AllocParameterMemory finish.";
692     return;
693   } else if (memo->find(kernel_graph) != memo->end()) {
694     return;
695   }
696   (void)memo->insert(kernel_graph);
697   auto parameters = FilterAllParameters(kernel_graph);
698   for (const auto &iter : parameters) {
699     auto parameter = utils::cast<ParameterPtr>(iter.second);
700     if (parameter == nullptr) {
701       continue;
702     }
703     SetKernelInfo(parameter);
704   }
705   runtime::DeviceAddressUtils::CreateParameterDeviceAddress(device_context_, kernel_graph);
706   // call AssignStaticMemoryInput recursively
707   auto ms_context = MsContext::GetInstance();
708   MS_EXCEPTION_IF_NULL(ms_context);
709   auto device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
710   auto runtime_instance = device::KernelRuntimeManager::Instance().GetKernelRuntime(kAscendDevice, device_id);
711   MS_EXCEPTION_IF_NULL(runtime_instance);
712   runtime_instance->AssignStaticMemoryInput(*kernel_graph.get());
713 }
714 
InitGraphInfo(const FuncGraphPtr & graph)715 void GeGraphExecutor::InitGraphInfo(const FuncGraphPtr &graph) {
716   MS_EXCEPTION_IF_NULL(graph);
717   KernelGraphPtr kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
718   MS_EXCEPTION_IF_NULL(kg);
719   BuildInputDataGeTensor(kg);
720   BuildOutputDataGeTensor(kg);
721 }
722 
BuildInputDataGeTensor(const KernelGraphPtr & kernel_graph)723 void GeGraphExecutor::BuildInputDataGeTensor(const KernelGraphPtr &kernel_graph) {
724   MS_LOG(INFO) << "Start BuildInputDataGeTensor, kernel graph: " << kernel_graph->ToString();
725   MS_EXCEPTION_IF_NULL(kernel_graph);
726   std::vector<GeTensor> ge_inputs;
727   std::vector<DeviceAddress *> device_addrs;
728   std::vector<std::pair<AnfNodeWeakPtr, size_t>> need_update_input;
729   std::vector<AnfNodeWeakPtr> ge_input_nodes;
730   auto ge_input_list = kernel_graph->user_data<transform::GEInputList>();
731   if (ge_input_list) {
732     ge_input_nodes = ge_input_list->ge_inputs;
733   }
734   for (const auto &node_wptr : ge_input_nodes) {
735     auto node = node_wptr.lock();
736     if (!node) {
737       MS_LOG(ERROR) << "Get node lock failed, kerne graph: " << kernel_graph->ToString();
738       continue;
739     }
740     auto name = node->fullname_with_scope();
741     MS_LOG(INFO) << "Build input ge tensor: " << name << ", kernel graph: " << kernel_graph->graph_id();
742     auto output_addr = AnfAlgo::GetMutableOutputAddr(node, 0, false);
743     (void)device_addrs.emplace_back(output_addr.get());
744     auto shapes = trans::GetRuntimePaddingShape(node, 0);
745     auto host_type = common::AnfAlgo::GetOutputInferDataType(node, 0);
746     auto ge_tensor_desc = transform::TransformUtil::GetGeTensorDesc(shapes, host_type, kOpFormat_DEFAULT);
747     MS_EXCEPTION_IF_NULL(ge_tensor_desc);
748     ge_tensor_desc->SetPlacement(::ge::kPlacementDevice);
749     GeTensor ge_tensor(*ge_tensor_desc);
750     if (output_addr->GetMutablePtr() != nullptr) {
751       MS_LOG(INFO) << "Node: " << name << " Has addr, size: " << output_addr->GetSize();
752       if (ge_tensor.SetData(reinterpret_cast<uint8_t *>(output_addr->GetMutablePtr()), output_addr->GetSize(),
753                             [](void *) {}) != ::ge::GRAPH_SUCCESS) {
754         MS_LOG(EXCEPTION) << "SetData failed, ge input data " << ge_inputs.size() << " name: " << name
755                           << " size: " << output_addr->GetSize();
756       }
757       MS_LOG(INFO) << "ge input data " << ge_inputs.size() << " name: " << name << " size: " << output_addr->GetSize();
758     }
759     // The device address of input tensor may change every step.
760     // Always keep the input node address consistent with the input tensor address.
761     (void)need_update_input.emplace_back(node, ge_inputs.size());
762     (void)ge_inputs.emplace_back(std::move(ge_tensor));
763   }
764   input_datas_[kernel_graph.get()] = {ge_inputs, device_addrs, need_update_input};
765   MS_LOG(INFO) << "BuildInputDataGeTensor finish.";
766 }
767 
BuildOutputDataGeTensor(const KernelGraphPtr & kernel_graph)768 void GeGraphExecutor::BuildOutputDataGeTensor(const KernelGraphPtr &kernel_graph) {
769   MS_LOG(INFO) << "Start BuildOutputDataGeTensor, kernel graph: " << kernel_graph->ToString();
770   MS_EXCEPTION_IF_NULL(kernel_graph);
771   std::vector<GeTensor> ge_outputs;
772   std::vector<DeviceAddress *> device_addrs;
773   std::vector<std::pair<AnfNodeWeakPtr, size_t>> graph_outputs;
774   auto outputs = common::AnfAlgo::GetAllOutputWithIndex(kernel_graph->output());
775   for (const auto &output : outputs) {
776     const auto &output_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
777     auto &output_node = output_with_index.first;
778     auto index = output_with_index.second;
779     MS_EXCEPTION_IF_NULL(output_node);
780     if (HasAbstractMonad(output_node)) {
781       continue;
782     }
783     if (common::AnfAlgo::IsNoOuputNode(output_node)) {
784       continue;
785     }
786     auto real_index = output_node->isa<ValueNode>() ? 0 : index;
787     auto device_addr = AnfAlgo::GetMutableOutputAddr(output_node, real_index, false);
788     (void)device_addrs.emplace_back(device_addr.get());
789     auto shapes = trans::GetRuntimePaddingShape(output_node, real_index);
790     auto host_type = common::AnfAlgo::GetOutputInferDataType(output_node, real_index);
791     auto ge_tensor_desc = transform::TransformUtil::GetGeTensorDesc(shapes, host_type, kOpFormat_DEFAULT);
792     MS_EXCEPTION_IF_NULL(ge_tensor_desc);
793     ge_tensor_desc->SetPlacement(::ge::kPlacementDevice);
794     GeTensor ge_tensor(*ge_tensor_desc);
795     (void)ge_outputs.emplace_back(std::move(ge_tensor));
796     (void)graph_outputs.emplace_back(output_node, index);
797   }
798   MS_EXCEPTION_IF_CHECK_FAIL(
799     ge_outputs.size() == graph_outputs.size(),
800     "The size of ge_outputs and graph_outputs check error, kernel graph: " + kernel_graph->ToString());
801   output_datas_[kernel_graph.get()] = {ge_outputs, device_addrs, graph_outputs};
802   MS_LOG(INFO) << "BuildOutputDataGeTensor finish.";
803 }
804 
CreateOutputDeviceAddress(const KernelGraphPtr & kernel_graph,const KernelWithIndex & output_with_index,size_t need_alloc_output_cnt) const805 DeviceAddressPtr GeGraphExecutor::CreateOutputDeviceAddress(const KernelGraphPtr &kernel_graph,
806                                                             const KernelWithIndex &output_with_index,
807                                                             size_t need_alloc_output_cnt) const {
808   MS_EXCEPTION_IF_NULL(kernel_graph);
809   auto output_node = output_with_index.first;
810   MS_EXCEPTION_IF_NULL(output_node);
811   auto ref_map = kernel_graph->GetRefMap();
812 
813   auto ms_context = MsContext::GetInstance();
814   MS_EXCEPTION_IF_NULL(ms_context);
815   auto device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
816   auto real_index = output_node->isa<ValueNode>() ? 0 : output_with_index.second;
817   TypeId output_type_id = common::AnfAlgo::GetOutputInferDataType(output_node, real_index);
818   size_t type_size = GetTypeByte(TypeIdToType(output_type_id));
819   auto shapes = trans::GetRuntimePaddingShape(output_node, real_index);
820   auto tensor_size =
821     shapes.empty() ? type_size : std::accumulate(shapes.begin(), shapes.end(), type_size, std::multiplies<size_t>());
822   // When ValueNode is a graph output, runtime does not manage this memory
823   // output in ref_map, mem same is input
824   bool need_not_alloc = (kernel_graph->has_flag(kFlagEnableZeroCopyInGraph) && !output_node->isa<ValueNode>()) ||
825                         (ref_map.find(output_with_index) != ref_map.end());
826   void *mem = need_not_alloc ? nullptr : ResManager()->AllocateMemory(tensor_size);
827 
828   if (common::IsNeedProfileMemory() && !need_not_alloc) {
829     MS_LOG(WARNING) << "Need Profile Memory, alloc type: ValueNodeOutput, size:" << tensor_size
830                     << ", graph: " << kernel_graph->ToString() << ", node: " << output_node->fullname_with_scope()
831                     << ", device address addr: " << mem;
832   }
833   if (!need_not_alloc) {
834     UpdateTracker("ValueNodeOutput", output_node->fullname_with_scope(), kernel_graph->ToString(), tensor_size, mem,
835                   device::tracker::MemType::kConstantValue);
836   }
837 
838   const auto kernel_tensor = AnfAlgo::CreateOutputKernelTensorWithDeviceInfo(
839     {output_node, real_index}, mem, tensor_size, kOpFormat_DEFAULT, output_type_id, {}, kAscendDevice, device_id);
840   auto output_device_addr = std::make_shared<AscendDeviceAddress>(kernel_tensor);
841   if (ref_map.find(output_with_index) != ref_map.end()) {
842     auto input_with_index = ref_map[output_with_index];
843     auto input_device_address = AnfAlgo::GetMutableOutputAddr(input_with_index.first, input_with_index.second, false);
844     MS_EXCEPTION_IF_NULL(input_device_address);
845     MS_LOG(INFO) << "The output node " << output_node->fullname_with_scope()
846                  << " is in ref_map, set the same device_address ptr as the corresponding input, input node: "
847                  << input_with_index.first->fullname_with_scope();
848     // Update the reference count of device address.
849     output_device_addr->set_pointer_ref_count(input_device_address->pointer_ref_count());
850     output_device_addr->IncreaseOriginalRefCount();
851     output_device_addr->ResetRefCount();
852   }
853   output_device_addr->set_device_synchronizer(std::make_shared<AscendDeviceSynchronizer>());
854   output_device_addr->set_is_ptr_persisted(true);
855   if (IsMemoryPoolRecycle() && need_alloc_output_cnt <= kNeedRecycleOutput) {
856     MS_LOG(INFO) << "Set Memory Pool Recycle, graph: " << kernel_graph->ToString()
857                  << ", node: " << output_node->fullname_with_scope();
858     output_device_addr->set_from_persistent_mem(true);
859     output_device_addr->set_need_recycle(true);
860   }
861   return output_device_addr;
862 }
863 
AllocOutputMemory(const KernelGraphPtr & kernel_graph) const864 void GeGraphExecutor::AllocOutputMemory(const KernelGraphPtr &kernel_graph) const {
865   MS_EXCEPTION_IF_NULL(kernel_graph);
866   MS_LOG(INFO) << "Start AllocOutputMemory, kernel graph: " << kernel_graph->ToString();
867 
868   auto outputs = common::AnfAlgo::GetAllOutputWithIndex(kernel_graph->output());
869   auto ref_map = kernel_graph->GetRefMap();
870   size_t need_alloc_output_cnt = 0;
871   for (const auto &output : outputs) {
872     const auto &output_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
873     auto &output_node = output_with_index.first;
874     if (output_node->isa<Parameter>() || output_node->isa<ValueNode>()) {
875       continue;
876     }
877     if (ref_map.find(output_with_index) != ref_map.end()) {
878       continue;
879     }
880     need_alloc_output_cnt++;
881   }
882 
883   for (const auto &output : outputs) {
884     const auto &output_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
885     auto &output_node = output_with_index.first;
886     MS_EXCEPTION_IF_NULL(output_node);
887     SetKernelInfo(output_node);
888 
889     // Parameter's memory is allocated earlier, and there is no need to reallocate memory if Parameter is output.
890     if (AnfAlgo::OutputAddrExist(output_node, output_with_index.second, false) || output_node->isa<Parameter>()) {
891       MS_LOG(INFO) << "The device_address of output node " << output_node->fullname_with_scope()
892                    << " is already exist, skip.";
893       continue;
894     }
895 
896     auto output_device_addr = CreateOutputDeviceAddress(kernel_graph, output_with_index, need_alloc_output_cnt);
897     AnfAlgo::SetOutputAddr(output_device_addr, output_with_index.second, output_node.get());
898     MS_LOG(INFO) << "Output node info: (name " << output_node->fullname_with_scope() << ", "
899                  << output_node->DebugString() << " ), output size: " << output_device_addr->GetSize()
900                  << ", device_address: " << output_device_addr;
901     // When both the input and output of NopNode are used as outputs, different memory needs to be allocated for them.
902   }
903   MS_LOG(INFO) << "AllocOutputMemory finish.";
904 }
905 
ResManager() const906 GeDeviceResManager *GeGraphExecutor::ResManager() const {
907   MS_EXCEPTION_IF_NULL(device_context_);
908   auto res_manager = dynamic_cast<GeDeviceResManager *>(device_context_->device_res_manager_.get());
909   MS_EXCEPTION_IF_NULL(res_manager);
910   return res_manager;
911 }
912 
PreprocessBeforeRun(const KernelGraphPtr & graph)913 void GeGraphExecutor::PreprocessBeforeRun(const KernelGraphPtr &graph) {
914   auto ret = CompileGraph(graph, {});
915   if (!ret) {
916     MS_LOG(EXCEPTION) << "Compile graph fail, graph id: " << graph->graph_id();
917   }
918 }
919 
BuildGraph(const KernelGraphPtr & graph,const transform::TensorOrderMap & tensor_order_map)920 bool GeGraphExecutor::BuildGraph(const KernelGraphPtr &graph, const transform::TensorOrderMap &tensor_order_map) {
921   std::set<KernelGraphPtr> memo;
922   GEGraphOptimization::GetInstance().OptimizeGEGraph(graph, &memo);
923   auto &compile_cache_context = CompileCacheContext::GetInstance();
924   auto use_compile_cache = compile_cache_context.UseCompileCache();
925   auto name = GetGraphName(graph);
926   bool has_cache = CacheFileExists(name);
927   if (use_compile_cache && has_cache) {
928     MS_LOG(INFO) << "Use ge compile cache, and skip specific optimization and ge_adapter execution";
929     if (!BuildFakeGraph(graph)) {
930       return false;
931     }
932   } else {
933     (void)BuildDFGraph(graph, tensor_order_map, false);
934   }
935   return true;
936 }
937 
AllocMemory(const KernelGraphPtr & graph)938 void GeGraphExecutor::AllocMemory(const KernelGraphPtr &graph) {
939   AllocParameterMemory(graph);
940   AllocOutputMemory(graph);
941   EnableGraphInputZeroCopy(graph);
942   EnableGraphOutputZeroCopy(graph);
943 }
944 
CompileGraph(const KernelGraphPtr & graph,const std::map<string,string> &)945 bool GeGraphExecutor::CompileGraph(const KernelGraphPtr &graph,
946                                    const std::map<string, string> & /* compile_options */) {
947   MS_EXCEPTION_IF_NULL(graph);
948   MS_LOG(INFO) << "ge graph executor compile graph " << graph->ToString();
949   auto &compile_cache_context = CompileCacheContext::GetInstance();
950   auto use_compile_cache = compile_cache_context.UseCompileCache();
951   std::map<std::string, ShapeVector> origin_shape;
952   const auto &tensor_order_map = GetDefaultParams(graph, &origin_shape);
953   auto name = GetGraphName(graph);
954   bool has_cache = CacheFileExists(name);
955   if (use_compile_cache && has_cache) {
956     MS_LOG(INFO) << "Use ge compile cache, and skip specific optimization and ge_adapter execution";
957     std::set<KernelGraphPtr> memo;
958     GEGraphOptimization::GetInstance().OptimizeGEGraph(graph, &memo);
959     if (!BuildFakeGraph(graph)) {
960       return false;
961     }
962   } else {
963     (void)BuildGraph(graph, tensor_order_map);
964   }
965   SetDynamicShapeAttr(graph);
966   transform::RunOptions run_options;
967   run_options.name = GetGraphName(graph);
968   auto graph_runner = transform::GetGraphRunner();
969   if (graph_runner == nullptr) {
970     MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
971   }
972   // create loop var
973   RunInitGraph(run_options.name);
974   if (graph->is_dynamic_shape()) {
975     // Release GIL before calling into (potentially long-running) C++ code
976     GilReleaseWithCheck gil_release;
977     auto ret = graph_runner->CompileGraph(run_options);
978     if (ret != transform::Status::SUCCESS) {
979       MS_LOG(EXCEPTION) << "Compile graph " << run_options.name << " failed.";
980     }
981   } else {
982     ::ge::CompiledGraphSummaryPtr ge_graph_summary = nullptr;
983     {
984       // Release GIL before calling into (potentially long-running) C++ code
985       GilReleaseWithCheck gil_release;
986       auto ret = graph_runner->CompileGraph(run_options, &ge_graph_summary);
987       if (ret != transform::Status::SUCCESS) {
988         MS_LOG(EXCEPTION) << "Compile graph " << run_options.name << " failed.";
989       }
990     }
991     GraphSummary summary(ge_graph_summary);
992     MS_LOG(INFO) << "Graph " << run_options.name << " summary: " << summary.ToString();
993     feature_memorys[run_options.name] = summary.feature_memory_size;
994     streams[run_options.name] = summary.stream_num;
995     AllocConstMemory(run_options, graph, summary.const_memory_size);
996     AllocFeatureMemory(run_options, summary.feature_memory_size);
997     AddRefCorrespondPairs(graph, summary.io_indexes);
998   }
999   AllocMemory(graph);
1000 
1001   graph->set_run_mode(RunMode::kGraphMode);
1002   graph->set_memory_managed_by_ge(true);
1003   if (ConfigManager::GetInstance().dataset_mode() == DatasetMode::DS_SINK_MODE) {
1004     graph->set_is_loop_count_sink(true);
1005   }
1006   RevertOriginShape(graph, origin_shape);
1007   return true;
1008 }
1009 
AddRefCorrespondPairs(const KernelGraphPtr & graph,const std::vector<std::pair<uint32_t,uint32_t>> & io_indexes) const1010 void GeGraphExecutor::AddRefCorrespondPairs(const KernelGraphPtr &graph,
1011                                             const std::vector<std::pair<uint32_t, uint32_t>> &io_indexes) const {
1012   MS_LOG(INFO) << "Start convert io_indexes to ref_map, kernel graph: " << graph->ToString();
1013   MS_EXCEPTION_IF_NULL(graph);
1014 
1015   std::map<session::AnfWithOutIndex, session::AnfWithOutIndex> ref_out_in_map = {};
1016   auto graph_inputs_all = graph->parameters();
1017   std::vector<AnfNodePtr> graph_inputs = {};
1018   for (auto &node : graph_inputs_all) {
1019     MS_EXCEPTION_IF_NULL(node);
1020     auto abs = node->abstract();
1021     MS_EXCEPTION_IF_NULL(abs);
1022     if (HasAbstractMonad(node) || abs->isa<abstract::AbstractSequence>()) {
1023       MS_LOG(INFO) << "Input node: " << node->DebugString() << " is a monad or tuple/list parameter, skip.";
1024       continue;
1025     }
1026     graph_inputs.emplace_back(node);
1027   }
1028 
1029   std::vector<common::KernelWithIndex> graph_outputs_all = {};
1030   common::AnfAlgo::GetRealInputs(graph->get_return(), &graph_outputs_all);
1031   std::vector<common::KernelWithIndex> graph_outputs = {};
1032 
1033   for (auto &node_with_index : graph_outputs_all) {
1034     if (common::AnfAlgo::IsNoOuputNode(node_with_index.first) || HasAbstractMonad(node_with_index.first)) {
1035       MS_LOG(INFO) << "Output node: " << node_with_index.first->fullname_with_scope()
1036                    << " is a no output node or monad node, skip.";
1037       continue;
1038     }
1039 
1040     graph_outputs.emplace_back(node_with_index);
1041   }
1042 
1043   for (auto in_out_index : io_indexes) {
1044     if (in_out_index.first >= graph_inputs.size() || in_out_index.second >= graph_outputs.size()) {
1045       MS_LOG(EXCEPTION) << "The io_indexes out of range, input index: " << in_out_index.first
1046                         << ", output index: " << in_out_index.second << ", graph input size: " << graph_inputs.size()
1047                         << ", graph output size: " << graph_outputs.size();
1048     }
1049     session::AnfWithOutIndex origin_node = std::make_pair(graph_inputs[in_out_index.first], 0);
1050     session::AnfWithOutIndex final_node = graph_outputs[in_out_index.second];
1051     if (origin_node.first == final_node.first) {
1052       MS_LOG(INFO) << "The origin node is same as final node, node: " << origin_node.first->fullname_with_scope();
1053       continue;
1054     }
1055     if (ref_out_in_map.count(final_node) != 0) {
1056       MS_LOG(INFO) << "The node is already in ref_out_in_map, node: " << final_node.first->fullname_with_scope()
1057                    << ", index: " << final_node.second;
1058       continue;
1059     }
1060     // if input node is not abstract ref, set ref may cause memory reuse error
1061     auto abs = origin_node.first->abstract();
1062     if (!abs->isa<abstract::AbstractRefTensor>()) {
1063       MS_LOG(INFO) << "The node is not abstract tensor: " << final_node.first->fullname_with_scope()
1064                    << ", index: " << final_node.second;
1065       continue;
1066     }
1067 
1068     ref_out_in_map.emplace(final_node, origin_node);
1069     MS_LOG(INFO) << "Convert io_index [" << in_out_index.first << ", " << in_out_index.second
1070                  << "] to ref_out_in_map, final_node: " << final_node.first->fullname_with_scope()
1071                  << ", index:" << final_node.second << ", origin_node: " << origin_node.first->fullname_with_scope()
1072                  << ", index: " << origin_node.second;
1073   }
1074 
1075   graph->set_ref_out_in_map(ref_out_in_map);
1076 }
1077 
CompileGraph(const FuncGraphPtr & graph,const std::map<string,string> & compile_options)1078 bool GeGraphExecutor::CompileGraph(const FuncGraphPtr &graph, const std::map<string, string> &compile_options) {
1079   MS_EXCEPTION_IF_NULL(graph);
1080 
1081   auto graph_name = GetGraphName(graph);
1082   profiler::CollectHostInfo("Ascend", "CompileGraph", "GeCompileGraph_" + graph_name, 1, 0, kCollectHostInfoStart);
1083 
1084   // cppcheck-suppress unreadVariable
1085   ContextReset reset_context(device_context_);
1086   KernelGraphPtr kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1087   MS_EXCEPTION_IF_NULL(kg);
1088   if (IsEnableRefMode()) {
1089     auto ret = CompileGraph(kg, compile_options);
1090     profiler::CollectHostInfo("Ascend", "CompileGraph", "GeCompileGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1091     return ret;
1092   } else {
1093     // delete SetCPUMemManager when delete env MS_DISABLE_REF_MODE
1094     ResManager()->SetCPUMemManager();
1095     std::map<std::string, ShapeVector> origin_shape;
1096     const auto &tensor_order_map = GetDefaultParams(graph, &origin_shape);
1097     auto &compile_cache_context = CompileCacheContext::GetInstance();
1098     auto use_compile_cache = compile_cache_context.UseCompileCache();
1099     if (use_compile_cache) {
1100       MS_LOG(INFO) << "Use ge compile cache, and skip specific optimization and ge_adapter execution";
1101       std::set<KernelGraphPtr> memo;
1102       GEGraphOptimization::GetInstance().OptimizeGEGraph(kg, &memo);
1103       if (!BuildFakeGraph(kg)) {
1104         profiler::CollectHostInfo("Ascend", "CompileGraph", "GeCompileGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1105         return false;
1106       }
1107     } else {
1108       (void)BuildGraph(kg, tensor_order_map);
1109     }
1110     SetDynamicShapeAttr(kg);
1111     AllocInputHostMemory(kg);
1112     AllocOutputHostMemory(kg);
1113     kg->set_run_mode(RunMode::kGraphMode);
1114     if (ConfigManager::GetInstance().dataset_mode() == DatasetMode::DS_SINK_MODE) {
1115       kg->set_is_loop_count_sink(true);
1116     }
1117     // copy init weight to device
1118     RunGEInitGraph(kg);
1119     RevertOriginShape(kg, origin_shape);
1120     profiler::CollectHostInfo("Ascend", "CompileGraph", "GeCompileGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1121     return true;
1122   }
1123 }
1124 
SetOutputs(const std::vector<KernelWithIndex> & graph_outputs,const std::vector<transform::GeTensorPtr> & ge_outputs,const std::vector<TypeId> & me_types)1125 void SetOutputs(const std::vector<KernelWithIndex> &graph_outputs,
1126                 const std::vector<transform::GeTensorPtr> &ge_outputs, const std::vector<TypeId> &me_types) {
1127   for (size_t i = 0; i < graph_outputs.size(); ++i) {
1128     const auto &[output_node, idx] = common::AnfAlgo::FetchRealNodeSkipMonadControl(graph_outputs[i]);
1129     const auto &tensor = ge_outputs[i];
1130     auto output_addr = AnfAlgo::GetMutableOutputAddr(output_node, idx);
1131     ::ge::Placement dp = tensor->GetTensorDesc().GetPlacement();
1132     auto &&ge_data_uni = tensor->ResetData();
1133     auto deleter = ge_data_uni.get_deleter();
1134     auto ge_data = ge_data_uni.release();
1135     MS_EXCEPTION_IF_NULL(ge_data);
1136     if (dp == ::ge::kPlacementHost) {
1137       constexpr int64_t kTensorAlignBytes = 64;
1138       if (reinterpret_cast<uintptr_t>(ge_data) % kTensorAlignBytes != 0) {
1139         MS_LOG(EXCEPTION) << "Skip zero-copy ge tensor " << reinterpret_cast<uintptr_t>(ge_data)
1140                           << ", bytes not aligned with expected.";
1141       }
1142       if (me_types[i] == TypeId::kObjectTypeString) {
1143         MS_LOG(EXCEPTION) << "It is not supported that Output node " << output_node->DebugString()
1144                           << "'s output data type is string now.";
1145       }
1146       MS_LOG(DEBUG) << "Zero-copy ge tensor " << reinterpret_cast<uintptr_t>(ge_data) << " as aligned with "
1147                     << kTensorAlignBytes << " types.";
1148       output_addr->set_is_ptr_persisted(false);
1149       output_addr->set_from_mem_pool(false);
1150       output_addr->set_deleter(deleter);
1151       output_addr->set_ptr(ge_data);
1152       output_addr->SetSize(tensor->GetSize());
1153     } else {
1154       MS_LOG(EXCEPTION) << "It is not supported that Output node " << output_node->DebugString()
1155                         << "'s output data's placement is device now.";
1156     }
1157     auto actual_shapes = tensor->GetTensorDesc().GetShape().GetDims();
1158     UpdateOutputNodeShape(output_node, idx, me_types[i], actual_shapes);
1159   }
1160 }
1161 
SetOutput(GeDeviceResManager * res_manager,GeTensor * ge_output,const AnfNodePtr & output_node,size_t idx)1162 void SetOutput(GeDeviceResManager *res_manager, GeTensor *ge_output, const AnfNodePtr &output_node, size_t idx) {
1163   if (output_node->isa<ValueNode>()) {
1164     auto &&ge_data_uni = ge_output->ResetData();
1165     auto deleter = ge_data_uni.get_deleter();
1166     auto ge_data = ge_data_uni.release();
1167     deleter(ge_data);
1168     return;
1169   }
1170   auto actual_shapes = ge_output->GetTensorDesc().GetShape().GetDims();
1171   for (size_t i = 0; i < actual_shapes.size(); ++i) {
1172     if (actual_shapes[i] < 0) {
1173       MS_LOG(EXCEPTION) << "Output shape must be greater than 0, but got " << actual_shapes;
1174     }
1175   }
1176   auto output_addr = AnfAlgo::GetMutableOutputAddr(output_node, idx, false);
1177   output_addr->SetSize(ge_output->GetSize());
1178   auto &&ge_data_uni = ge_output->ResetData();
1179   auto deleter = ge_data_uni.get_deleter();
1180   auto ge_data = ge_data_uni.release();
1181   MS_EXCEPTION_IF_NULL(ge_data);
1182   output_addr->set_is_ptr_persisted(false);
1183   output_addr->set_from_mem_pool(false);
1184   output_addr->set_deleter(deleter);
1185   output_addr->set_ptr(ge_data);
1186   auto placement = ge_output->GetTensorDesc().GetPlacement();
1187   if (placement == ::ge::kPlacementHost) {
1188     MS_LOG(DEBUG) << output_node->DebugString() << "'s output data's placement is host";
1189     size_t size = ge_output->GetSize();
1190     void *mem = res_manager->AllocateMemory(size);
1191     if (mem == nullptr) {
1192       MS_LOG(EXCEPTION) << "Allocate memory failed, memory size:" << size
1193                         << ", output_node: " << output_node->ToString();
1194     }
1195     output_addr->set_from_mem_pool(true);
1196     output_addr->set_ptr(mem);
1197     auto *ascend_addr = dynamic_cast<AscendDeviceAddress *>(output_addr.get());
1198     MS_EXCEPTION_IF_NULL(ascend_addr);
1199     ascend_addr->SyncHostToDevice(size, ge_data);
1200   }
1201   // Update shape in kernel tensor.
1202   const auto &kernel_tensor = AnfAlgo::GetOutputKernelTensor(output_node, idx);
1203   MS_EXCEPTION_IF_NULL(kernel_tensor);
1204   kernel_tensor->SetShapeVector(actual_shapes);
1205   MS_LOG(INFO) << "[ZeroCopy] Update output " << output_node->DebugString() << " address to "
1206                << output_addr->GetMutablePtr() << ", shape:" << actual_shapes
1207                << ", type: " << TypeIdToString(output_addr->type_id()) << ", format: " << output_addr->format();
1208 }
1209 
SetDynamicOutputs(const std::vector<KernelWithIndex> & graph_outputs,std::vector<GeTensor> * ge_outputs,GeDeviceResManager * res_manager)1210 void SetDynamicOutputs(const std::vector<KernelWithIndex> &graph_outputs, std::vector<GeTensor> *ge_outputs,
1211                        GeDeviceResManager *res_manager) {
1212   MS_EXCEPTION_IF_NULL(res_manager);
1213   size_t ge_outputs_index = 0;
1214   size_t ge_outputs_size = ge_outputs->size();
1215   for (size_t i = 0; i < graph_outputs.size(); ++i) {
1216     const auto &[output_node, idx] = common::AnfAlgo::FetchRealNodeSkipMonadControl(graph_outputs[i]);
1217     MS_EXCEPTION_IF_NULL(output_node);
1218     if (HasAbstractMonad(output_node)) {
1219       continue;
1220     }
1221     if (common::AnfAlgo::IsNoOuputNode(output_node)) {
1222       continue;
1223     }
1224     if (ge_outputs_index >= ge_outputs_size) {
1225       MS_LOG(EXCEPTION) << "GE data access is out of bounds, which the current index value is " << ge_outputs_index
1226                         << ", the total number of GE output is " << ge_outputs_size << ".";
1227     }
1228     SetOutput(res_manager, &((*ge_outputs)[ge_outputs_index++]), output_node, idx);
1229   }
1230 }
1231 
GetGraphFeatureMemory(const FuncGraphPtr & graph) const1232 size_t GeGraphExecutor::GetGraphFeatureMemory(const FuncGraphPtr &graph) const {
1233   MS_EXCEPTION_IF_NULL(graph);
1234   auto graph_name = GetGraphName(graph);
1235   auto iter = feature_memorys.find(graph_name);
1236   if (iter == feature_memorys.end()) {
1237     MS_LOG(EXCEPTION) << "Graph " << graph_name << " feature memory not found.";
1238   }
1239   auto stream_iter = streams.find(graph_name);
1240   if (stream_iter == streams.end()) {
1241     MS_LOG(EXCEPTION) << "Graph " << graph_name << " stream not found.";
1242   }
1243   MS_LOG(WARNING) << "Need Profile Memory, graph: " << graph_name << ", stream: " << stream_iter->second;
1244   auto max_static_memory_size = ResManager()->GetMaxUsedMemorySize();
1245   auto feature_memory_size = iter->second;
1246   auto total_memory_size = max_static_memory_size + feature_memory_size;
1247   AscendMemAdapter::GetInstance().UpdateActualPeakMemory(total_memory_size);
1248   UpdateFMTracker(feature_memory_size, graph_name);
1249   return feature_memory_size;
1250 }
CurGraphSinkSize(std::string graph_name)1251 int64_t GeGraphExecutor::CurGraphSinkSize(std::string graph_name) {
1252   int64_t sink_size = -1;
1253   auto result = graph_sink_size_.find(graph_name);
1254   if (result != graph_sink_size_.end()) {
1255     sink_size = result->second;
1256   } else {
1257     auto ms_context = MsContext::GetInstance();
1258     MS_EXCEPTION_IF_NULL(ms_context);
1259     if (ConfigManager::GetInstance().dataset_mode() == DS_SINK_MODE &&
1260         ms_context->get_param<bool>(MS_CTX_ENABLE_LOOP_SINK)) {
1261       sink_size = ConfigManager::GetInstance().iter_num();
1262     }
1263     MS_LOG(INFO) << "Graph [" << graph_name << "] sink size is " << sink_size;
1264     graph_sink_size_.insert(std::pair(graph_name, sink_size));
1265   }
1266   return sink_size;
1267 }
1268 
RunGraphRefMode(const FuncGraphPtr & graph,const std::vector<tensor::Tensor> & inputs)1269 bool GeGraphExecutor::RunGraphRefMode(const FuncGraphPtr &graph, const std::vector<tensor::Tensor> &inputs) {
1270   MS_EXCEPTION_IF_NULL(graph);
1271   auto graph_name = GetGraphName(graph);
1272   RunInitGraph(graph_name);
1273   MS_LOG(INFO) << "GE run graph start in ref mode, graph: " << graph_name << ".";
1274   (void)ResManager()->BindDeviceToCurrentThread(false);
1275 
1276   // call ge rungraph
1277   KernelGraphPtr kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1278   transform::RunOptions run_options;
1279   run_options.name = graph_name;
1280   auto graph_runner = transform::GetGraphRunner();
1281   if (graph_runner == nullptr) {
1282     MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
1283   }
1284 
1285   std::vector<GeTensor> ge_inputs = GenerateInputGeTensor(kg);
1286   std::vector<GeTensor> ge_outputs = GenerateOutputGeTensor(kg);
1287 
1288   bool is_dynamic_shape = kg->is_dynamic_shape();
1289   if (IsMemoryPoolRecycle() && !is_dynamic_shape) {
1290     auto max_static_memory_size = ResManager()->GetMaxUsedMemorySize();
1291     auto iter = feature_memorys.find(graph_name);
1292     if (iter == feature_memorys.end()) {
1293       MS_LOG(EXCEPTION) << "Graph " << graph_name << " feature memory not found.";
1294     }
1295     auto feature_memory_size = iter->second;
1296     if (feature_memory_size != 0) {
1297       size_t total_memory_size = max_static_memory_size + feature_memory_size;
1298       size_t max_hbm_memory_size = static_cast<size_t>(AscendMemAdapter::GetInstance().GetMsUsedHbmSize());
1299       AscendMemAdapter::GetInstance().UpdateActualPeakMemory(total_memory_size);
1300       UpdateFMTracker(feature_memory_size, graph_name);
1301       if (common::IsNeedMemoryStatistic()) {
1302         MS_LOG(WARNING) << "Now Memory Status, graph: " << graph_name
1303                         << ", max_static_memory_size: " << max_static_memory_size
1304                         << ", feature_memory_size: " << feature_memory_size
1305                         << ", max_hbm_memory_size: " << max_hbm_memory_size;
1306       }
1307       if (total_memory_size > max_hbm_memory_size) {
1308         MS_LOG(EXCEPTION) << "Memory pool not enough, graph: " << graph_name
1309                           << ", max_static_memory_size: " << max_static_memory_size
1310                           << ", feature_memory_size: " << feature_memory_size
1311                           << ", max_hbm_memory_size: " << max_hbm_memory_size;
1312       }
1313     }
1314   }
1315 
1316   {
1317     // Release GIL before calling into (potentially long-running) C++ code
1318     GilReleaseWithCheck gil_release;
1319     MS_LOG(INFO) << "Run graph begin, inputs size is: " << inputs.size() << ", " << graph_name;
1320     if (IsNeedNotifyTTP(graph)) {
1321       MS_LOG(INFO) << "Found optimizer sub graph and send event to mindio";
1322       auto sync_ret = ResManager()->SyncStream();
1323       if (!sync_ret) {
1324         MS_LOG(EXCEPTION) << "Sync stream failed";
1325       } else {
1326         mindio::MindIOAdapter::GetInstance()->NotifyStartUpdatingOs();
1327       }
1328     }
1329     transform::Status ret =
1330       transform::RunGraphWithStreamAsync(graph_runner, run_options, ResManager()->GetStream(), ge_inputs, &ge_outputs);
1331     if (ret != transform::Status::SUCCESS) {
1332       MS_LOG(EXCEPTION) << "Exec graph failed";
1333     }
1334   }
1335   if (is_dynamic_shape) {
1336     auto graph_outputs = common::AnfAlgo::GetAllOutputWithIndex(graph->output());
1337     SetDynamicOutputs(graph_outputs, &ge_outputs, ResManager());
1338     auto sync_ret = ResManager()->SyncStream();
1339     if (!sync_ret) {
1340       MS_LOG(EXCEPTION) << "Sync stream failed";
1341     }
1342   }
1343   ClearForwardOutputAddress(kg, device_context_);
1344   return true;
1345 }
1346 
DoAsyncCkpt(const FuncGraphPtr & graph)1347 void GeGraphExecutor::DoAsyncCkpt(const FuncGraphPtr &graph) {
1348   MS_EXCEPTION_IF_NULL(graph);
1349   auto kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1350   auto ms_context = MsContext::GetInstance();
1351   MS_EXCEPTION_IF_NULL(ms_context);
1352   auto env = common::GetEnv("MS_ENABLE_CKPT_D2H_ASYNC");
1353   if (env == "1" && ms_context->get_param<bool>(MS_CTX_NEED_CKPT) && kg != nullptr) {
1354     auto cur_step = ms_context->get_param<int>(MS_CTX_CUR_STEP_NUM);
1355     auto save_steps = ms_context->get_param<int>(MS_CTX_SAVE_CKPT_STEPS);
1356     auto last_triggered_step = ms_context->get_param<int>(MS_CTX_LAST_TRIGGERED_STEP);
1357     MS_LOG(DEBUG) << "cur_step:" << cur_step << ", save_steps: " << save_steps
1358                   << ", last_triggered_step:" << last_triggered_step;
1359     if (cur_step >= (last_triggered_step + save_steps)) {
1360       if (SkipOrResetCopyAction()) {
1361         MS_LOG(INFO) << "Enable async d2h copy";
1362         SavePrevStepWeight(kg->GetRootWeights(), ResManager()->GetCopyDataStream());
1363       }
1364       if (kg->has_attr(kIsRefGraph) && GetValue<bool>(kg->get_attr(kIsRefGraph)) && SkipOrResetSyncAction()) {
1365         MS_LOG(INFO) << "Ref graph sync once action";
1366         SyncCopyStream(ResManager()->GetCopyDataStream());
1367       }
1368     }
1369   }
1370 }
1371 
IsNeedNotifyTTP(const FuncGraphPtr & graph)1372 bool GeGraphExecutor::IsNeedNotifyTTP(const FuncGraphPtr &graph) {
1373   MS_EXCEPTION_IF_NULL(graph);
1374   auto kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1375   if (mindio::MindIOAdapter::GetInstance()->IsEnable() && kg != nullptr && kg->has_attr(kIsRefGraph) &&
1376       GetValue<bool>(kg->get_attr(kIsRefGraph))) {
1377     return true;
1378   }
1379   return false;
1380 }
1381 
RunGraph(const FuncGraphPtr & graph,const std::vector<tensor::Tensor> & inputs,std::vector<tensor::Tensor> * outputs,const std::map<string,string> &)1382 bool GeGraphExecutor::RunGraph(const FuncGraphPtr &graph, const std::vector<tensor::Tensor> &inputs,
1383                                std::vector<tensor::Tensor> *outputs,
1384                                const std::map<string, string> & /* compile_options */) {
1385   MS_EXCEPTION_IF_NULL(graph);
1386   auto graph_name = GetGraphName(graph);
1387   profiler::CollectHostInfo("Ascend", "RunGraph", "GeRunGraph_" + graph_name, 1, 0, kCollectHostInfoStart);
1388   DoAsyncCkpt(graph);
1389   if (IsEnableRefMode()) {
1390     if (!RunGraphRefMode(graph, inputs)) {
1391       profiler::CollectHostInfo("Ascend", "RunGraph", "GeRunGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1392       return false;
1393     }
1394   } else {
1395     MS_LOG(INFO) << "GE run graph start, graph: " << graph_name << ".";
1396     (void)ResManager()->BindDeviceToCurrentThread(false);
1397     // copy input from device to host
1398     const auto &cur_inputs = graph->get_inputs();
1399     std::vector<tensor::TensorPtr> input_tensors;
1400     for (const auto &input : cur_inputs) {
1401       MS_EXCEPTION_IF_NULL(input);
1402       auto output_addr = AnfAlgo::GetMutableOutputAddr(input, 0);
1403       auto shapes = trans::GetRuntimePaddingShape(input, 0);
1404       auto host_type = common::AnfAlgo::GetOutputInferDataType(input, 0);
1405       auto tensor = std::make_shared<tensor::Tensor>(host_type, shapes);
1406       MS_EXCEPTION_IF_NULL(tensor);
1407       tensor->set_device_address(output_addr, false);
1408       tensor->data_sync();
1409       (void)input_tensors.emplace_back(std::move(tensor));
1410     }
1411     auto ge_inputs = transform::ConvertInputTensors(input_tensors, kOpFormat_NCHW);
1412 
1413     // call ge rungraph
1414     KernelGraphPtr kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1415     if (kg != nullptr) {
1416       graph_name = kg->GetFuncGraph()->ToString();
1417     }
1418     transform::RunOptions run_options;
1419     run_options.name = graph_name;
1420     auto graph_runner = transform::GetGraphRunner();
1421     if (graph_runner == nullptr) {
1422       MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
1423     }
1424 
1425     AnfNodePtr output = graph->get_return()->input(1);
1426     MS_EXCEPTION_IF_NULL(output);
1427     std::vector<TypeId> me_types;
1428     auto output_c = output->cast<CNodePtr>()->abstract();
1429     // get output node data types
1430     GetMeRetDataType(output_c, &me_types);
1431     std::vector<transform::GeTensorPtr> ge_outputs;
1432     {
1433       // Release GIL before calling into (potentially long-running) C++ code
1434       GilReleaseWithCheck gil_release;
1435       MS_LOG(DEBUG) << "Run graph begin, inputs size is: " << inputs.size();
1436       transform::Status ret = transform::RunGraphAsync(graph_runner, run_options, ge_inputs, &ge_outputs);
1437       MS_LOG(DEBUG) << "Run graph finish, outputs size is: " << ge_outputs.size();
1438       if (ret == transform::Status::NOT_FOUND) {
1439         MS_LOG(WARNING) << "The Graph[" << graph_name << "] is not found, skip run it.";
1440         profiler::CollectHostInfo("Ascend", "RunGraph", "GeRunGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1441         return true;
1442       } else if (ret != transform::Status::SUCCESS) {
1443         MS_LOG(EXCEPTION) << "Exec graph failed";
1444       }
1445     }
1446     auto no_output = common::AnfAlgo::IsNoOuputNode(output);
1447     if (!no_output) {
1448       if (me_types.size() != ge_outputs.size()) {
1449         MS_LOG(EXCEPTION) << "Invalid output size, me_type's size " << me_types.size() << " tensor size "
1450                           << ge_outputs.size();
1451       }
1452       // copy output from host to device
1453       auto graph_outputs = common::AnfAlgo::GetAllOutputWithIndex(graph->output());
1454       if (graph_outputs.size() != ge_outputs.size()) {
1455         MS_LOG(EXCEPTION) << "Invalid output size, graph's size " << graph_outputs.size() << " tensor size "
1456                           << ge_outputs.size();
1457       }
1458       SetOutputs(graph_outputs, ge_outputs, me_types);
1459     }
1460   }
1461   if (graph->has_flag(transform::kGraphFlagHasGetNext)) {
1462     MS_LOG(DEBUG) << "Reset ConfigManager, graph: " << graph_name;
1463     ConfigManager::GetInstance().ResetConfig();
1464     ConfigManager::GetInstance().ResetIterNum();
1465   }
1466   profiler::CollectHostInfo("Ascend", "RunGraph", "GeRunGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1467   MS_LOG(INFO) << "GE run graph end.";
1468   return true;
1469 }
1470 
BuildDFGraph(const FuncGraphPtr & anf_graph,const transform::TensorOrderMap & init_inputs_map,bool export_air)1471 FuncGraphPtr GeGraphExecutor::BuildDFGraph(const FuncGraphPtr &anf_graph,
1472                                            const transform::TensorOrderMap &init_inputs_map, bool export_air) {
1473   MS_EXCEPTION_IF_NULL(anf_graph);
1474 #ifdef ENABLE_DUMP_IR
1475   auto context = MsContext::GetInstance();
1476   MS_EXCEPTION_IF_NULL(context);
1477   if (context->CanDump(kIntroductory)) {
1478     if (context->CanDump(kFully)) {
1479       draw::Draw("anf_graph_before_build_df_graph.dot", anf_graph);  // for debug
1480     }
1481     DumpIR("anf_graph_before_build_df_graph.ir", anf_graph, true, kWholeStack);
1482   }
1483 #endif
1484 
1485   if (!AddDFGraph(anf_graph, init_inputs_map, export_air)) {
1486     MS_LOG(ERROR) << "GenConvertor failed";
1487     return nullptr;
1488   }
1489 
1490 #ifdef ENABLE_DUMP_IR
1491   if (context->CanDump(kIntroductory)) {
1492     if (context->CanDump(kFully)) {
1493       draw::Draw("anf_graph_after_build_df_graph.dot", anf_graph);  // for debug
1494     }
1495     DumpIR("anf_graph_after_build_df_graph.ir", anf_graph, true, kWholeStack);
1496   }
1497 #endif
1498 
1499   if (export_air) {
1500     // export air can't use session->AddGraph, it will cause atc error.
1501     return anf_graph;
1502   }
1503 
1504   return anf_graph;
1505 }
1506 
GetNodeInfo(const AnfNodeWeakPtr & node)1507 static inline std::string GetNodeInfo(const AnfNodeWeakPtr &node) {
1508   auto input_node = node.lock();
1509   MS_EXCEPTION_IF_NULL(input_node);
1510   return input_node->DebugString();
1511 }
1512 
GenerateInputGeTensor(const KernelGraphPtr & kernel_graph) const1513 std::vector<GeTensor> GeGraphExecutor::GenerateInputGeTensor(const KernelGraphPtr &kernel_graph) const {
1514   MS_EXCEPTION_IF_NULL(kernel_graph);
1515   std::vector<GeTensor> ge_inputs;
1516   auto iter = input_datas_.find(kernel_graph.get());
1517   if (iter == input_datas_.end()) {
1518     return ge_inputs;
1519   }
1520   bool is_dynamic_shape = kernel_graph->is_dynamic_shape();
1521   const auto &input_datas = iter->second.ge_inputs;
1522   ge_inputs = input_datas;
1523   for (size_t i = 0; i < iter->second.device_addrs.size(); ++i) {
1524     auto output_addr = iter->second.device_addrs[i];
1525     MS_EXCEPTION_IF_NULL(output_addr);
1526     if (is_dynamic_shape) {
1527       auto ge_tensor_desc = transform::TransformUtil::GetGeTensorDesc(output_addr->kernel_tensor()->GetShapeVector(),
1528                                                                       output_addr->type_id(), output_addr->format());
1529       MS_EXCEPTION_IF_NULL(ge_tensor_desc);
1530       ge_tensor_desc->SetPlacement(::ge::kPlacementDevice);
1531       (void)ge_inputs[i].SetTensorDesc(*ge_tensor_desc);
1532     }
1533     auto node_output_addr = output_addr->GetMutablePtr();
1534     if (node_output_addr == nullptr) {
1535       auto input_node = iter->second.need_update_input[i].first.lock();
1536       MS_EXCEPTION_IF_NULL(input_node);
1537       // alloc static memory for unused inputs
1538       // error in ge when set nullptr into ge tensor
1539       std::vector<size_t> shape = Convert2SizeT(common::AnfAlgo::GetOutputInferShape(input_node, 0));
1540       size_t type_size = GetTypeByte(TypeIdToType(common::AnfAlgo::GetOutputInferDataType(input_node, 0)));
1541       size_t memory_size = std::accumulate(shape.begin(), shape.end(), type_size, std::multiplies<size_t>{});
1542       MS_EXCEPTION_IF_NULL(ResManager());
1543       auto memory = ResManager()->AllocateMemory(memory_size);
1544       output_addr->set_ptr(memory);
1545       output_addr->SetSize(memory_size);
1546       if (common::IsNeedProfileMemory()) {
1547         MS_LOG(WARNING) << "Need Profile Memory, alloc type: UnusedInput, size:" << memory_size
1548                         << ", graph: " << kernel_graph->ToString() << ", node: " << input_node->fullname_with_scope()
1549                         << ", device address addr: " << memory;
1550       }
1551       UpdateTracker("UnusedInput", input_node->fullname_with_scope(), kernel_graph->ToString(), memory_size, memory,
1552                     device::tracker::MemType::kOther);
1553     }
1554     MS_LOG(INFO) << "[ZeroCopy] For Graph " << kernel_graph->ToString() << ", update input "
1555                  << GetNodeInfo(iter->second.need_update_input[i].first) << " address to "
1556                  << output_addr->GetMutablePtr() << ", shape:" << output_addr->kernel_tensor()->GetShapeVector()
1557                  << ", type: " << TypeIdToString(output_addr->type_id()) << ", format: " << output_addr->format()
1558                  << ", memory size: " << output_addr->GetSize();
1559     if (node_output_addr != ge_inputs[i].GetData() || output_addr->GetSize() != ge_inputs[i].GetSize()) {
1560       (void)ge_inputs[i].SetData(static_cast<uint8_t *>(node_output_addr), output_addr->GetSize(), [](void *) {});
1561     }
1562   }
1563   return ge_inputs;
1564 }
1565 
GenerateOutputGeTensor(const KernelGraphPtr & kernel_graph) const1566 std::vector<GeTensor> GeGraphExecutor::GenerateOutputGeTensor(const KernelGraphPtr &kernel_graph) const {
1567   MS_EXCEPTION_IF_NULL(kernel_graph);
1568   std::vector<GeTensor> ge_outputs;
1569   auto iter = output_datas_.find(kernel_graph.get());
1570   if (iter == output_datas_.end()) {
1571     return ge_outputs;
1572   }
1573   const auto &output_datas = iter->second.ge_outputs;
1574   ge_outputs = output_datas;
1575 
1576   bool is_dynamic_shape = kernel_graph->is_dynamic_shape();
1577   for (size_t idx = 0; idx < iter->second.device_addrs.size(); ++idx) {
1578     if (is_dynamic_shape) {
1579       ge_outputs[idx].SetData(nullptr, 0U, [](void *) {});
1580       continue;
1581     }
1582     auto output_node = iter->second.graph_outputs[idx].first.lock();
1583     auto index = iter->second.graph_outputs[idx].second;
1584     MS_EXCEPTION_IF_NULL(output_node);
1585     MS_EXCEPTION_IF_CHECK_FAIL(
1586       idx < ge_outputs.size(),
1587       "GenerateOutputGeTensor idx is greater equal than ge_outputs size, idx: " + std::to_string(idx) +
1588         ", ge outputs size: " + std::to_string(ge_outputs.size()) + ", kernel graph: " + kernel_graph->ToString());
1589     auto output_device_addr = iter->second.device_addrs[idx];
1590     auto node_output_device_addr = output_device_addr->GetMutablePtr();
1591     MS_LOG(INFO) << "Output addr " << node_output_device_addr;
1592     if (node_output_device_addr == nullptr) {
1593       MS_LOG(EXCEPTION) << "Output " << output_node->fullname_with_scope() << ", index: " << index
1594                         << " address is nullptr, kernel graph: " << kernel_graph->ToString()
1595                         << ", addr memory size: " << output_device_addr->GetSize()
1596                         << "\n Maybe memory is not enough, memory statistics:"
1597                         << AscendMemAdapter::GetInstance().DevMemStatistics();
1598     }
1599     MS_LOG(INFO) << "[ZeroCopy] For Graph " << kernel_graph->ToString() << ", update output "
1600                  << output_node->DebugString() << " out_idx " << index << " address to "
1601                  << output_device_addr->GetMutablePtr()
1602                  << ", shape:" << output_device_addr->kernel_tensor()->GetShapeVector()
1603                  << ", type: " << TypeIdToString(output_device_addr->type_id())
1604                  << ", format: " << output_device_addr->format() << ", memory size: " << output_device_addr->GetSize();
1605     if (node_output_device_addr != ge_outputs[idx].GetData() ||
1606         output_device_addr->GetSize() != ge_outputs[idx].GetSize()) {
1607       (void)ge_outputs[idx].SetData(reinterpret_cast<uint8_t *>(node_output_device_addr), output_device_addr->GetSize(),
1608                                     [](void *) {});
1609     }
1610   }
1611   return ge_outputs;
1612 }
1613 
RunInitGraph(const std::string & graph_name)1614 void GeGraphExecutor::RunInitGraph(const std::string &graph_name) {
1615   transform::RunOptions run_options;
1616   run_options.name = "init_subgraph." + graph_name;
1617   if (transform::GetGraphByName(run_options.name) == nullptr) {
1618     MS_LOG(INFO) << "Can not find " << run_options.name << " sub graph, don't need data init subgraph in INFER mode.";
1619     return;
1620   }
1621   auto graph_runner = transform::GetGraphRunner();
1622   if (graph_runner == nullptr) {
1623     MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
1624   }
1625 
1626   auto cur_sink_size = CurGraphSinkSize(graph_name);
1627   if (pre_sink_size_ == cur_sink_size) {
1628     return;
1629   }
1630   pre_sink_size_ = cur_sink_size;
1631   MS_LOG(INFO) << "Start run init graph: " << run_options.name << ", sink size:" << cur_sink_size;
1632   std::vector<transform::GeTensorPtr> ge_outputs;
1633   std::vector<transform::GeTensorPtr> ge_tensors;
1634   {
1635     // Release GIL before calling into (potentially long-running) C++ code
1636     GilReleaseWithCheck gil_release;
1637     transform::Status ret = transform::RunGraph(graph_runner, run_options, ge_tensors, &ge_outputs);
1638     if (ret != transform::Status::SUCCESS) {
1639       MS_LOG(EXCEPTION) << "Exec " << run_options.name << " graph failed.";
1640     }
1641     MS_LOG(INFO) << "Exec " << run_options.name << " graph success.";
1642   }
1643 }
1644 }  // namespace ascend
1645 }  // namespace device
1646 }  // namespace mindspore
1647