1 /**
2 * Copyright 2023-2024 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "plugin/device/ascend/hal/hardware/ge_graph_executor.h"
18 #include <tuple>
19 #include <functional>
20 #include <algorithm>
21 #include <utility>
22 #include <map>
23 #include <set>
24 #include <sstream>
25 #include "include/transform/graph_ir/types.h"
26 #include "include/transform/graph_ir/utils.h"
27 #include "include/common/utils/utils.h"
28 #include "include/common/debug/draw.h"
29 #include "include/common/debug/anf_ir_dump.h"
30 #include "include/common/utils/scoped_long_running.h"
31 #include "abstract/abstract_value.h"
32 #include "include/backend/kernel_graph.h"
33 #include "plugin/device/cpu/hal/device/cpu_device_address.h"
34 #include "runtime/device/ms_device_shape_transfer.h"
35 #include "runtime/device/device_address_utils.h"
36 #include "plugin/device/cpu/hal/device/cpu_memory_manager.h"
37 #include "plugin/device/ascend/hal/hccl_adapter/hccl_adapter.h"
38 #include "plugin/device/ascend/optimizer/ge_optimization.h"
39 #include "plugin/device/ascend/hal/common/ascend_utils.h"
40 #include "plugin/device/ascend/hal/hardware/ge_device_res_manager.h"
41 #include "plugin/device/ascend/hal/hardware/ge_utils.h"
42 #include "plugin/device/ascend/hal/device/ascend_memory_adapter.h"
43 #include "plugin/device/ascend/hal/device/ascend_device_address.h"
44 #include "plugin/device/ascend/hal/hardware/ge_graph_optimization.h"
45 #include "plugin/device/ascend/hal/device/ascend_device_synchronizer.h"
46 #include "include/backend/debug/profiler/profiling.h"
47 #include "include/backend/mem_reuse/mem_tracker.h"
48 #include "ge/ge_graph_compile_summary.h"
49 #include "kernel/kernel_build_info.h"
50 #include "op_proto/inc/array_ops.h"
51 #include "ops/nn_op_name.h"
52 #include "ops/array_ops.h"
53 #include "pybind_api/gil_scoped_long_running.h"
54 #include "include/common/utils/compile_cache_context.h"
55 using InputNameAndType = std::vector<std::pair<std::string, bool>>;
56 using Data = ::ge::op::Data;
57 using RefData = ::ge::op::RefData;
58
59 namespace mindspore {
60 namespace device {
61 namespace ascend {
62 namespace {
63 const std::set<std::string> kIgnoreGEShapeOps = {kSoftMarginLossOpName};
64 mindspore::HashMap<std::string, size_t> feature_memorys;
65 mindspore::HashMap<std::string, size_t> streams;
66 constexpr size_t kNeedRecycleOutput = 5;
67 constexpr int kCollectHostInfoStart = 0;
68 constexpr int kCollectHostInfoEnd = 1;
69
GetMeRetDataType(const AbstractBasePtr & cnode_data,std::vector<TypeId> * me_types)70 void GetMeRetDataType(const AbstractBasePtr &cnode_data, std::vector<TypeId> *me_types) {
71 MS_EXCEPTION_IF_NULL(cnode_data);
72
73 if (cnode_data->isa<abstract::AbstractNone>()) {
74 return;
75 }
76
77 if (cnode_data->isa<abstract::AbstractTensor>()) {
78 TypeId me_type = cnode_data->BuildType()->type_id();
79 if (me_type == kObjectTypeTensorType) {
80 me_type = dyn_cast<TensorType>(cnode_data->BuildType())->element()->type_id();
81 (void)me_types->emplace_back(me_type);
82 }
83 return;
84 }
85 if (cnode_data->isa<abstract::AbstractScalar>()) {
86 TypeId me_type = cnode_data->BuildType()->type_id();
87 (void)me_types->emplace_back(me_type);
88 return;
89 }
90 auto abstract_tuple = cnode_data->cast<abstract::AbstractTuplePtr>();
91 MS_EXCEPTION_IF_NULL(abstract_tuple);
92 auto elements = abstract_tuple->elements();
93 for (size_t i = 0; i < abstract_tuple->size(); ++i) {
94 GetMeRetDataType(elements[i], me_types);
95 }
96 }
97
GetDefaultParams(const FuncGraphPtr & anf_graph,std::map<std::string,ShapeVector> * origin_shape)98 transform::TensorOrderMap GetDefaultParams(const FuncGraphPtr &anf_graph,
99 std::map<std::string, ShapeVector> *origin_shape) {
100 MS_EXCEPTION_IF_NULL(anf_graph);
101 transform::TensorOrderMap res;
102 for (auto &anf_node : anf_graph->parameters()) {
103 MS_EXCEPTION_IF_NULL(anf_node);
104 auto para = anf_node->cast<ParameterPtr>();
105 MS_EXCEPTION_IF_NULL(para);
106 if (para->has_default()) {
107 auto value = para->default_param();
108 MS_EXCEPTION_IF_NULL(value);
109 auto tensor = value->cast<std::shared_ptr<tensor::Tensor>>();
110 MS_EXCEPTION_IF_NULL(tensor);
111 origin_shape->emplace(para->name(), tensor->shape_c());
112 // need ref shape when auto parallel
113 auto build_shape = para->abstract()->BuildShape();
114 if (build_shape != nullptr) {
115 (void)tensor->MetaTensor::set_shape(build_shape->cast<abstract::ShapePtr>()->shape());
116 MS_LOG(INFO) << "ref abstract Parameter: " << para->name() << ", tensor: " << tensor->ToString();
117 }
118 res.emplace(para->name(), tensor);
119 MS_LOG(DEBUG) << "Parameter " << para->name() << " has default value.";
120 }
121 }
122 return res;
123 }
124
RevertOriginShape(const KernelGraphPtr & anf_graph,const std::map<std::string,ShapeVector> & origin_shape)125 void RevertOriginShape(const KernelGraphPtr &anf_graph, const std::map<std::string, ShapeVector> &origin_shape) {
126 MS_EXCEPTION_IF_NULL(anf_graph);
127 transform::TensorOrderMap res;
128 for (auto &anf_node : anf_graph->parameters()) {
129 MS_EXCEPTION_IF_NULL(anf_node);
130 auto para = anf_node->cast<ParameterPtr>();
131 MS_EXCEPTION_IF_NULL(para);
132 if (para->has_default()) {
133 auto it = origin_shape.find(para->name());
134 if (it == origin_shape.end()) {
135 MS_LOG(ERROR) << "Failed to find input " << para->name() << " in input_shape " << origin_shape;
136 continue;
137 }
138 auto value = para->default_param();
139 MS_EXCEPTION_IF_NULL(value);
140 auto tensor = value->cast<std::shared_ptr<tensor::Tensor>>();
141 (void)tensor->MetaTensor::set_shape(it->second);
142 MS_LOG(INFO) << "ref abstract Parameter: " << para->name() << ", tensor: " << tensor->ToString();
143 }
144 }
145 }
146
GetInputTensors(const FuncGraphPtr & anf_graph)147 std::vector<transform::GeTensorPtr> GetInputTensors(const FuncGraphPtr &anf_graph) {
148 MS_EXCEPTION_IF_NULL(anf_graph);
149 transform::TensorOrderMap init_input_map;
150 std::vector<tensor::TensorPtr> init_input;
151 for (auto &anf_node : anf_graph->parameters()) {
152 MS_EXCEPTION_IF_NULL(anf_node);
153 auto para = anf_node->cast<ParameterPtr>();
154 MS_EXCEPTION_IF_NULL(para);
155 if (para->has_default()) {
156 auto value = para->default_param();
157 MS_EXCEPTION_IF_NULL(value);
158 (void)init_input_map.emplace(para->name(), value->cast<std::shared_ptr<tensor::Tensor>>());
159 }
160 }
161 (void)std::transform(init_input_map.begin(), init_input_map.end(), std::back_inserter(init_input),
162 [](const std::pair<std::string, tensor::TensorPtr> &item) { return item.second; });
163 return transform::ConvertInputTensors(init_input, kOpFormat_NCHW);
164 }
165
RunGEInitGraph(const FuncGraphPtr & anf_graph)166 void RunGEInitGraph(const FuncGraphPtr &anf_graph) {
167 MS_LOG(DEBUG) << "ExecInitGraph start.";
168 MS_EXCEPTION_IF_NULL(anf_graph);
169
170 transform::RunOptions run_options;
171 run_options.name = "init_subgraph." + anf_graph->ToString();
172
173 auto graph_runner = transform::CheckAndGetGraphRunner(run_options);
174 if (graph_runner == nullptr) {
175 return;
176 }
177
178 std::vector<transform::GeTensorPtr> ge_tensors;
179 std::vector<transform::GeTensorPtr> ge_outputs;
180 {
181 // Release GIL before calling into (potentially long-running) C++ code
182 GilReleaseWithCheck gil_release;
183 transform::Status ret = transform::RunGraph(graph_runner, run_options, ge_tensors, &ge_outputs);
184 if (ret != transform::Status::SUCCESS) {
185 MS_LOG(EXCEPTION) << "Exec " << run_options.name << " graph failed.";
186 }
187
188 MS_LOG(DEBUG) << "Exec " << run_options.name << " graph success.";
189
190 if ((ConfigManager::GetInstance().parallel_strategy() == ParallelStrategy::DISTRIBUTION) &&
191 (transform::GetGraphByName(BROADCAST_GRAPH_NAME) != nullptr)) {
192 run_options.name = BROADCAST_GRAPH_NAME;
193 ge_tensors = GetInputTensors(anf_graph);
194 ret = transform::RunGraph(graph_runner, run_options, ge_tensors, &ge_outputs);
195 if (ret != transform::Status::SUCCESS) {
196 MS_LOG(EXCEPTION) << "Exec BROADCAST_GRAPH_NAME failed.";
197 }
198 MS_LOG(DEBUG) << "Exec broadcast graph success.";
199 }
200 }
201 }
202
UpdateOutputNodeShape(const AnfNodePtr & node,size_t index,TypeId output_type,const ShapeVector & output_shape)203 void UpdateOutputNodeShape(const AnfNodePtr &node, size_t index, TypeId output_type, const ShapeVector &output_shape) {
204 MS_EXCEPTION_IF_NULL(node);
205 std::string name;
206 if (node->isa<CNode>()) {
207 name = common::AnfAlgo::GetCNodeName(node);
208 }
209 size_t total_output_num = AnfAlgo::GetOutputElementNum(node);
210 if (index >= total_output_num) {
211 MS_LOG_WITH_NODE(EXCEPTION, node) << "Invalid output index " << index << ", node " << node->fullname_with_scope()
212 << " has " << total_output_num << " outputs.";
213 }
214 std::vector<TypeId> types = {};
215 std::vector<ShapeVector> shapes = {};
216 for (size_t i = 0; i < total_output_num; ++i) {
217 if (i == index && kIgnoreGEShapeOps.count(name) == 0) {
218 types.push_back(output_type);
219 shapes.push_back(output_shape);
220 } else {
221 types.push_back(common::AnfAlgo::GetOutputInferDataType(node, i));
222 (void)shapes.emplace_back(common::AnfAlgo::GetOutputInferShape(node, i));
223 }
224 }
225 common::AnfAlgo::SetOutputInferTypeAndShape(types, shapes, node.get());
226 }
227
SetDynamicShapeAttr(const KernelGraphPtr & kernel_graph)228 void SetDynamicShapeAttr(const KernelGraphPtr &kernel_graph) {
229 MS_EXCEPTION_IF_NULL(kernel_graph);
230 auto nodes = TopoSort(kernel_graph->output());
231 for (auto &node : nodes) {
232 if (common::AnfAlgo::IsDynamicShape(node)) {
233 MS_LOG(DEBUG) << "Set Dynamic Shape Attr to Node : " << node->fullname_with_scope();
234 kernel_graph->SetGraphDynamicAttr(true);
235 return;
236 }
237 }
238 }
239
EnableGraphInputZeroCopy(const KernelGraphPtr & graph)240 void EnableGraphInputZeroCopy(const KernelGraphPtr &graph) {
241 MS_EXCEPTION_IF_NULL(graph);
242 // Zero copy is only enabled for PyNative and Subgraph sink.
243 if ((!graph->has_flag(kFlagPyNativeRunInGraph) && !graph->has_flag(kFlagEnableZeroCopyInGraph)) ||
244 !graph->is_graph_run_mode()) {
245 return;
246 }
247 const auto &input_nodes = graph->input_nodes();
248 for (const auto &input : input_nodes) {
249 MS_EXCEPTION_IF_NULL(input);
250 if (AnfAlgo::OutputAddrExist(input, 0)) {
251 auto input_address = AnfAlgo::GetMutableOutputAddr(input, 0, false);
252 MS_EXCEPTION_IF_NULL(input_address);
253 input_address->set_is_ptr_persisted(false);
254 input_address->ClearFlag(device::kDeviceAddressFlagNotUsed);
255 MS_LOG(INFO) << "Enable zero copy for input " << input->DebugString();
256 }
257 }
258 }
259
EnableGraphOutputZeroCopy(const KernelGraphPtr & graph)260 void EnableGraphOutputZeroCopy(const KernelGraphPtr &graph) {
261 MS_LOG(DEBUG) << "EnableGraphOutputZeroCopy start";
262 MS_EXCEPTION_IF_NULL(graph);
263 if ((!graph->has_flag(kFlagEnableZeroCopyInGraph)) || !graph->is_graph_run_mode()) {
264 MS_LOG(DEBUG) << "EnableGraphOutputZeroCopy start return";
265 return;
266 }
267 // Zero copy is only enabled for subgraph sink.
268 auto outputs = common::AnfAlgo::GetAllOutputWithIndex(graph->output());
269 for (const auto &output : outputs) {
270 const auto &node_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
271 const auto &node = node_with_index.first;
272 const auto &index = node_with_index.second;
273 MS_EXCEPTION_IF_NULL(node);
274 MS_LOG(DEBUG) << "EnableGraphOutputZeroCopy check node:" << node->DebugString();
275 if (node->isa<CNode>() && AnfAlgo::OutputAddrExist(node, index)) {
276 auto device_address = AnfAlgo::GetMutableOutputAddr(node, index, false);
277 MS_EXCEPTION_IF_NULL(device_address);
278 device_address->set_is_ptr_persisted(false);
279 MS_LOG(DEBUG) << "Disable ptr persisted in output node:" << node->DebugString() << " index:" << index
280 << " address:" << device_address << " for graph:" << graph->ToString();
281 }
282 }
283 }
284
285 struct GraphSummary {
286 size_t const_memory_size = 0;
287 size_t feature_memory_size = 0;
288 bool is_feature_memory_refreshable = false;
289 size_t stream_num = 0;
290 size_t event_num = 0;
291 std::vector<ShapeVector> output_shapes = {};
292 std::vector<ge::DataType> output_dtypes = {};
293 // pair<input_index, output_index>
294 std::vector<std::pair<uint32_t, uint32_t>> io_indexes;
295 bool is_static = false;
296
297 GraphSummary() = default;
GraphSummarymindspore::device::ascend::__anon360b53500111::GraphSummary298 explicit GraphSummary(const ::ge::CompiledGraphSummaryPtr &graph_summary) {
299 MS_EXCEPTION_IF_NULL(graph_summary);
300 is_static = graph_summary->IsStatic();
301 if (is_static) {
302 ::ge::graphStatus status;
303 status = graph_summary->GetConstMemorySize(const_memory_size);
304 if (status != ::ge::GRAPH_SUCCESS) {
305 MS_LOG(EXCEPTION) << "GetConstMemorySize failed, status = " << status;
306 }
307 status = graph_summary->GetFeatureMemorySize(feature_memory_size);
308 if (status != ::ge::GRAPH_SUCCESS) {
309 MS_LOG(EXCEPTION) << "GetFeatureMemorySize failed, status = " << status;
310 }
311 status = graph_summary->GetFeatureMemoryBaseRefreshable(is_feature_memory_refreshable);
312 if (status != ::ge::GRAPH_SUCCESS) {
313 MS_LOG(EXCEPTION) << "GetFeatureMemoryBaseRefreshable failed, status = " << status;
314 }
315 status = graph_summary->GetStreamNum(stream_num);
316 if (status != ::ge::GRAPH_SUCCESS) {
317 MS_LOG(EXCEPTION) << "GetStreamNum failed, status = " << status;
318 }
319 status = graph_summary->GetEventNum(event_num);
320 if (status != ::ge::GRAPH_SUCCESS) {
321 MS_LOG(EXCEPTION) << "GetEventNum failed, status = " << status;
322 }
323 std::vector<::ge::Shape> ge_shapes;
324 status = graph_summary->GetOutputShapes(ge_shapes);
325 if (status != ::ge::GRAPH_SUCCESS) {
326 MS_LOG(EXCEPTION) << "GetOutputShapes failed, status = " << status;
327 }
328 (void)std::transform(ge_shapes.begin(), ge_shapes.end(), std::back_inserter(output_shapes),
329 [](const ::ge::Shape &ge_shape) -> ShapeVector { return ge_shape.GetDims(); });
330 if (graph_summary->GetOutputDtypes(output_dtypes) != ::ge::GRAPH_SUCCESS) {
331 MS_LOG(EXCEPTION) << "GetOutputDtypes failed, status = " << status
332 << ", maybe the execution mode is not as expected.";
333 }
334 if (graph_summary->GetIOIndexesWithSameAddr(io_indexes) != ::ge::GRAPH_SUCCESS) {
335 MS_LOG(EXCEPTION) << "GetIOIndexesWithSameAddr failed, status = " << status
336 << ", maybe the execution mode is not as expected.";
337 }
338 } else {
339 MS_LOG(WARNING) << "Graph is not static, maybe the execution mode is not as expected.";
340 }
341 }
342
ToStringmindspore::device::ascend::__anon360b53500111::GraphSummary343 std::string ToString() const {
344 std::stringstream ss;
345 ss << "const_memory_size[" << const_memory_size << "], feature_memory_size[" << feature_memory_size
346 << "], is_feature_memory_refreshable[" << is_feature_memory_refreshable << "], stream_num[" << stream_num
347 << "], event_num[" << event_num << "], output size[" << output_shapes.size() << "], is_static[" << is_static
348 << "]";
349 if (!output_shapes.empty()) {
350 if (output_shapes.size() != output_dtypes.size()) {
351 MS_LOG(WARNING) << "The output_dtypes size in summary is not equal to output_shapes size.";
352 }
353 for (size_t i = 0; i < output_shapes.size(); ++i) {
354 std::string shape_str = "[";
355 std::string dtype_str = "";
356 for (size_t j = 0; j < output_shapes[i].size(); ++j) {
357 if (j != output_shapes[i].size() - 1) {
358 shape_str += std::to_string(output_shapes[i][j]) + ",";
359 } else {
360 shape_str += std::to_string(output_shapes[i][j]) + "]";
361 }
362 }
363
364 if (output_shapes[i].empty()) {
365 shape_str = "[]";
366 }
367 if (i < output_dtypes.size()) {
368 dtype_str += "[";
369 dtype_str += TransGeDtypeToString(output_dtypes[i]);
370 dtype_str += "]";
371 }
372 if (dtype_str.empty()) {
373 ss << ", output[" << i << "] shape = " << shape_str;
374 } else {
375 ss << ", output[" << i << "] shape = " << shape_str << " dtype = " << dtype_str;
376 }
377 }
378 }
379 if (!io_indexes.empty()) {
380 std::string io_indexes_str = "[";
381 for (auto io_index : io_indexes) {
382 io_indexes_str += "[" + std::to_string(io_index.first) + "," + std::to_string(io_index.second) + "]";
383 }
384 io_indexes_str += "]";
385 ss << ", io_indexes: " << io_indexes_str;
386 }
387
388 return ss.str();
389 }
390
391 private:
TransGeDtypeToStringmindspore::device::ascend::__anon360b53500111::GraphSummary392 std::string TransGeDtypeToString(const transform::GeDataType dtype) const {
393 std::string dtype_str = "";
394 if (transform::ge_dtype_str_map.find(dtype) != transform::ge_dtype_str_map.end()) {
395 dtype_str = transform::ge_dtype_str_map[dtype];
396 }
397 return dtype_str;
398 }
399 };
400
FilterAllParameters(const KernelGraphPtr & kernel_graph)401 std::multimap<std::string, ParameterPtr> FilterAllParameters(const KernelGraphPtr &kernel_graph) {
402 MS_EXCEPTION_IF_NULL(kernel_graph);
403 std::multimap<std::string, ParameterPtr> ret;
404 std::vector<AnfNodePtr> todo = kernel_graph->input_nodes();
405 (void)todo.insert(todo.end(), kernel_graph->child_graph_result().begin(), kernel_graph->child_graph_result().end());
406 for (const auto &node : todo) {
407 MS_EXCEPTION_IF_NULL(node);
408 if (!node->isa<Parameter>()) {
409 continue;
410 }
411 auto parameter = node->cast<ParameterPtr>();
412 MS_EXCEPTION_IF_NULL(parameter);
413 std::string name = parameter->name();
414 (void)ret.emplace(name, parameter);
415 }
416 return ret;
417 }
418
SetParameterKernelInfo(const AnfNodePtr & node,const std::shared_ptr<device::KernelInfo> & kernel_info)419 void SetParameterKernelInfo(const AnfNodePtr &node, const std::shared_ptr<device::KernelInfo> &kernel_info) {
420 MS_EXCEPTION_IF_NULL(kernel_info);
421 auto build_info = kernel_info->GetMutableSelectKernelBuildInfo();
422 if (!build_info) {
423 MS_LOG(ERROR) << "Parameter doesn't have build info: " << node->DebugString()
424 << ", full name: " << node->fullname_with_scope();
425 return;
426 }
427 std::vector<TypeId> refresh_output_types = {common::AnfAlgo::GetOutputInferDataType(node, 0)};
428 build_info->SetOutputsDeviceType(refresh_output_types);
429 }
430
SetKernelInfo(const AnfNodePtr & node)431 void SetKernelInfo(const AnfNodePtr &node) {
432 MS_EXCEPTION_IF_NULL(node);
433 // If kernel build info has been set up. skip
434 std::shared_ptr<device::KernelInfo> kernel_info =
435 std::dynamic_pointer_cast<device::KernelInfo>(node->kernel_info_ptr());
436 if (utils::isa<ParameterPtr>(node)) {
437 SetParameterKernelInfo(node, kernel_info);
438 return;
439 }
440
441 if (!kernel_info) {
442 kernel_info = std::make_shared<device::KernelInfo>();
443 MS_EXCEPTION_IF_NULL(kernel_info);
444 node->set_kernel_info(kernel_info);
445 }
446
447 auto build_info = kernel_info->GetMutableSelectKernelBuildInfo();
448 if (!build_info) {
449 auto builder = std::make_shared<kernel::KernelBuildInfo::KernelBuildInfoBuilder>();
450 MS_EXCEPTION_IF_NULL(builder);
451 build_info = builder->Build();
452 }
453
454 const auto &output_with_indexs = common::AnfAlgo::GetAllOutputWithIndex(node);
455 std::vector<TypeId> output_infer_types;
456 std::vector<std::string> output_formats;
457 for (const auto &output_with_index : output_with_indexs) {
458 (void)output_infer_types.emplace_back(
459 common::AnfAlgo::GetOutputInferDataType(output_with_index.first, output_with_index.second));
460 (void)output_formats.emplace_back(kOpFormat_DEFAULT);
461 }
462 build_info->SetOutputsDeviceType(output_infer_types);
463 build_info->SetOutputsFormat(output_formats);
464 kernel_info->set_select_kernel_build_info(build_info);
465 }
466
BuildFakeGraph(const FuncGraphPtr & anf_graph)467 bool BuildFakeGraph(const FuncGraphPtr &anf_graph) {
468 MS_EXCEPTION_IF_NULL(anf_graph);
469 #ifdef ENABLE_DUMP_IR
470 auto context = MsContext::GetInstance();
471 MS_EXCEPTION_IF_NULL(context);
472 if (context->CanDump(kIntroductory)) {
473 if (context->CanDump(kFully)) {
474 draw::Draw("anf_graph_before_build_df_graph.dot", anf_graph); // for debug
475 }
476 DumpIR("anf_graph_before_build_df_graph.ir", anf_graph, true, kWholeStack);
477 }
478 #endif
479 (void)setenv("GE_TRAIN", IsGeTrain() ? "1" : "0", 1);
480 if (!AddFakeGraph(anf_graph)) {
481 MS_LOG(ERROR) << "Add fake graph failed";
482 return false;
483 }
484 #ifdef ENABLE_DUMP_IR
485 if (context->CanDump(kIntroductory)) {
486 if (context->CanDump(kFully)) {
487 draw::Draw("anf_graph_after_build_df_graph.dot", anf_graph); // for debug
488 }
489 DumpIR("anf_graph_after_build_df_graph.ir", anf_graph, true, kWholeStack);
490 }
491 #endif
492 return true;
493 }
494
ClearForwardOutputAddress(const KernelGraphPtr & graph,const DeviceContext * device_context)495 void ClearForwardOutputAddress(const KernelGraphPtr &graph, const DeviceContext *device_context) {
496 MS_EXCEPTION_IF_NULL(graph);
497 if (!graph->has_flag(kFlagPyNativeRunInGraph)) {
498 return;
499 }
500 const auto &input_nodes = graph->input_nodes();
501 for (const auto &input : input_nodes) {
502 MS_EXCEPTION_IF_NULL(input);
503 auto parameter = input->cast<ParameterPtr>();
504 if (parameter != nullptr) {
505 if (parameter->has_user_data(kForwardOutput)) {
506 auto device_address = AnfAlgo::GetMutableOutputAddr(parameter, 0);
507 auto new_address = runtime::DeviceAddressUtils::CloneEmptyDeviceAddress(device_address, device_context);
508 AnfAlgo::SetOutputAddr(new_address, 0, parameter.get());
509 MS_LOG(DEBUG) << "Clear old address " << device_address.get() << " and set new address " << new_address.get()
510 << " to parameter " << parameter->name();
511 }
512 }
513 }
514 }
515
516 class ContextReset {
517 public:
ContextReset(DeviceContext * device_context)518 explicit ContextReset(DeviceContext *device_context) : device_context_(device_context) {}
~ContextReset()519 ~ContextReset() {
520 if (device_context_ != nullptr && device_context_->device_res_manager_ != nullptr) {
521 device_context_->device_res_manager_->BindDeviceToCurrentThread(true);
522 }
523 }
524
525 private:
526 DeviceContext *device_context_;
527 };
528
UpdateTracker(const std::string & task_name,const std::string & node_name,const std::string & graph_str,size_t size,void * device_ptr,device::tracker::MemType mem_type)529 void UpdateTracker(const std::string &task_name, const std::string &node_name, const std::string &graph_str,
530 size_t size, void *device_ptr, device::tracker::MemType mem_type) {
531 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, task_name, node_name, graph_str);
532 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddCompileTimeMemInfo, task_name, size, device_ptr, mem_type);
533 }
534
UpdateFMTracker(size_t feature_memory_size,const std::string & graph_name)535 void UpdateFMTracker(size_t feature_memory_size, const std::string &graph_name) {
536 device::tracker::CALL_MEMORY_TRACKER(AllocMemBlock, 0, feature_memory_size, "Ascend",
537 AscendMemAdapter::GetInstance().GetActualPeakMemory(), 0, 0, 0);
538 device::tracker::CALL_MEMORY_TRACKER(FreeMemBlock, 0, 0, 0);
539 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, "RunGeGraph", "RunGeGraph", graph_name);
540 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddCompileTimeMemInfo, "RunGeGraph", feature_memory_size, 0,
541 device::tracker::MemType::kGeFeatureMemory);
542 }
543
CacheFileExists(const std::string & name)544 bool CacheFileExists(const std::string &name) {
545 auto &compile_cache_context = CompileCacheContext::GetInstance();
546 auto dep_files_hash = compile_cache_context.CompileCacheDepFilesHash();
547 auto ge_graph_key = name;
548 if (!dep_files_hash.empty()) {
549 ge_graph_key = dep_files_hash + "_" + ge_graph_key;
550 }
551 auto ge_cache_path = Common::GetCompilerCachePath() + kGeCache;
552 ge_graph_key = NormalizeString(ge_graph_key);
553 auto cache_idx_file = ge_cache_path + "/" + ge_graph_key + ".idx";
554 struct stat buffer;
555 bool ret = stat(cache_idx_file.c_str(), &buffer) == 0;
556 MS_LOG(INFO) << "Cached index file name: " << cache_idx_file << " exists: " << ret;
557 return ret;
558 }
559
560 } // namespace
561
AllocInputHostMemory(const KernelGraphPtr & kernel_graph) const562 void GeGraphExecutor::AllocInputHostMemory(const KernelGraphPtr &kernel_graph) const {
563 MS_EXCEPTION_IF_NULL(kernel_graph);
564 const auto &inputs = kernel_graph->inputs();
565 auto device_id = device_context_->device_context_key().device_id_;
566 for (const auto &input : inputs) {
567 auto builder = std::make_shared<kernel::KernelBuildInfo::KernelBuildInfoBuilder>();
568 builder->SetOutputsFormat({kOpFormat_DEFAULT});
569 std::vector<TypeId> output_type = {common::AnfAlgo::GetOutputInferDataType(input, 0)};
570 builder->SetOutputsDeviceType(output_type);
571 AnfAlgo::SetSelectKernelBuildInfo(builder->Build(), input.get());
572 }
573
574 for (const auto &input_node : inputs) {
575 if (!input_node->isa<Parameter>()) {
576 MS_LOG(DEBUG) << input_node->fullname_with_scope() << " is not parameter, continue";
577 continue;
578 }
579 TypeId output_type_id = common::AnfAlgo::GetOutputInferDataType(input_node, 0);
580
581 size_t tensor_size;
582 if (kernel_graph->is_dynamic_shape()) {
583 tensor_size = 0;
584 } else {
585 std::vector<size_t> shape = Convert2SizeT(common::AnfAlgo::GetOutputInferShape(input_node, 0));
586 size_t type_size = GetTypeByte(TypeIdToType(output_type_id));
587 tensor_size = std::accumulate(shape.begin(), shape.end(), type_size, std::multiplies<size_t>());
588 }
589
590 auto input_with_index = std::make_pair(input_node, 0);
591 const auto kernel_tensor = AnfAlgo::CreateOutputKernelTensorWithDeviceInfo(
592 input_with_index, nullptr, tensor_size, kOpFormat_DEFAULT, output_type_id, {}, kAscendDevice, device_id);
593 auto device_address_ptr = std::make_shared<GeHostAddress>(kernel_tensor);
594 device_address_ptr->set_is_ptr_persisted(false);
595 AnfAlgo::SetOutputAddr(device_address_ptr, 0, input_node.get());
596 }
597 }
598
AllocOutputHostMemory(const KernelGraphPtr & kernel_graph) const599 void GeGraphExecutor::AllocOutputHostMemory(const KernelGraphPtr &kernel_graph) const {
600 MS_EXCEPTION_IF_NULL(kernel_graph);
601 auto outputs = common::AnfAlgo::GetAllOutputWithIndex(kernel_graph->output());
602 auto device_id = device_context_->device_context_key().device_id_;
603 for (const auto &output : outputs) {
604 const auto &output_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
605 auto &output_node = output_with_index.first;
606 MS_EXCEPTION_IF_NULL(output_node);
607 SetKernelInfo(output_node);
608
609 // Parameter's memory is allocated earlier, and there is no need to reallocate memory if Parameter is output.
610 if (output_node->isa<Parameter>()) {
611 continue;
612 }
613
614 auto i = output_with_index.second;
615 TypeId output_type_id = common::AnfAlgo::GetOutputInferDataType(output_node, i);
616 const auto kernel_tensor = AnfAlgo::CreateOutputKernelTensorWithDeviceInfo(
617 output_with_index, nullptr, 0, kOpFormat_DEFAULT, output_type_id, {}, kAscendDevice, device_id);
618 auto output_device_addr = std::make_shared<GeHostAddress>(kernel_tensor);
619 AnfAlgo::SetOutputAddr(output_device_addr, i, output_node.get());
620
621 if (common::AnfAlgo::IsNopNode(output_node)) {
622 auto [real_node, real_idx] = common::AnfAlgo::GetPrevNodeOutput(output_node, i, true);
623 if (real_node != output_node || real_idx != i) {
624 // set output addr size if the input node is output.
625 const auto &inputs = kernel_graph->inputs();
626 if (std::any_of(inputs.begin(), inputs.end(),
627 [&real_node](const AnfNodePtr &input_node) { return real_node == input_node; })) {
628 auto real_node_addr = AnfAlgo::GetMutableOutputAddr(real_node, real_idx);
629 output_device_addr->SetSize(real_node_addr->GetSize());
630 }
631 AnfAlgo::SetOutputAddr(output_device_addr, real_idx, real_node.get());
632 }
633 }
634 }
635 }
636
AllocConstMemory(const transform::RunOptions & options,const KernelGraphPtr & graph,size_t memory_size) const637 void GeGraphExecutor::AllocConstMemory(const transform::RunOptions &options, const KernelGraphPtr &graph,
638 size_t memory_size) const {
639 if (memory_size == 0) {
640 return;
641 }
642 MS_LOG(INFO) << "Start AllocConstMemory, memory_size: " << memory_size;
643 auto memory = ResManager()->AllocateMemory(memory_size);
644 if (memory == nullptr) {
645 MS_LOG(EXCEPTION) << "Allocate memory failed, memory size:" << memory_size << ", graph: " << graph->ToString();
646 }
647 if (common::IsNeedProfileMemory()) {
648 MS_LOG(WARNING) << "Need Profile Memory, alloc type: ConstMemory, size: " << memory_size
649 << ", graph: " << graph->ToString() << ", device address addr: " << memory;
650 }
651 UpdateTracker("AllocConstMemory", "ConstMemory", graph->ToString(), memory_size, memory,
652 device::tracker::MemType::kGeConst);
653 auto graph_runner = transform::GetGraphRunner();
654 MS_EXCEPTION_IF_NULL(graph_runner);
655 auto ret = graph_runner->SetConstMemory(options, memory, memory_size);
656 if (ret != transform::Status::SUCCESS) {
657 MS_LOG(EXCEPTION) << "SetConstMemory for graph " << options.name << " failed.";
658 }
659 MS_LOG(INFO) << "End AllocConstMemory";
660 }
661
AllocFeatureMemory(const transform::RunOptions & options,size_t memory_size) const662 void GeGraphExecutor::AllocFeatureMemory(const transform::RunOptions &options, size_t memory_size) const {
663 if (memory_size == 0) {
664 return;
665 }
666 MS_LOG(INFO) << "Start AllocFeatureMemory, memory_size: " << memory_size;
667 auto memory_manager = ResManager()->mem_manager_;
668 MS_EXCEPTION_IF_NULL(memory_manager);
669 memory_manager->ResetDynamicMemory();
670 auto memory = memory_manager->MallocWorkSpaceMem(memory_size);
671 if (memory == nullptr) {
672 MS_LOG(EXCEPTION) << "AllocFeatureMemory error, memory not enough, memory size: " << memory_size;
673 }
674 auto graph_runner = transform::GetGraphRunner();
675 MS_EXCEPTION_IF_NULL(graph_runner);
676 auto ret = graph_runner->UpdateFeatureMemory(options, memory, memory_size);
677 if (ret != transform::Status::SUCCESS) {
678 MS_LOG(EXCEPTION) << "UpdateFeatureMemory for graph " << options.name << " failed.";
679 }
680 memory_manager->ResetDynamicMemory();
681 MS_LOG(INFO) << "End AllocFeatureMemory";
682 }
683
AllocParameterMemory(const KernelGraphPtr & kernel_graph,std::set<KernelGraphPtr> * memo) const684 void GeGraphExecutor::AllocParameterMemory(const KernelGraphPtr &kernel_graph, std::set<KernelGraphPtr> *memo) const {
685 // Set Device Type to be same as Host Type, AssignStaticMemoryInput will ignore parameters without DeviceType
686 MS_EXCEPTION_IF_NULL(kernel_graph);
687 if (memo == nullptr) {
688 MS_LOG(INFO) << "Start AllocParameterMemory, kernel graph: " << kernel_graph->ToString();
689 std::set<KernelGraphPtr> memo_set;
690 AllocParameterMemory(kernel_graph, &memo_set);
691 MS_LOG(INFO) << "AllocParameterMemory finish.";
692 return;
693 } else if (memo->find(kernel_graph) != memo->end()) {
694 return;
695 }
696 (void)memo->insert(kernel_graph);
697 auto parameters = FilterAllParameters(kernel_graph);
698 for (const auto &iter : parameters) {
699 auto parameter = utils::cast<ParameterPtr>(iter.second);
700 if (parameter == nullptr) {
701 continue;
702 }
703 SetKernelInfo(parameter);
704 }
705 runtime::DeviceAddressUtils::CreateParameterDeviceAddress(device_context_, kernel_graph);
706 // call AssignStaticMemoryInput recursively
707 auto ms_context = MsContext::GetInstance();
708 MS_EXCEPTION_IF_NULL(ms_context);
709 auto device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
710 auto runtime_instance = device::KernelRuntimeManager::Instance().GetKernelRuntime(kAscendDevice, device_id);
711 MS_EXCEPTION_IF_NULL(runtime_instance);
712 runtime_instance->AssignStaticMemoryInput(*kernel_graph.get());
713 }
714
InitGraphInfo(const FuncGraphPtr & graph)715 void GeGraphExecutor::InitGraphInfo(const FuncGraphPtr &graph) {
716 MS_EXCEPTION_IF_NULL(graph);
717 KernelGraphPtr kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
718 MS_EXCEPTION_IF_NULL(kg);
719 BuildInputDataGeTensor(kg);
720 BuildOutputDataGeTensor(kg);
721 }
722
BuildInputDataGeTensor(const KernelGraphPtr & kernel_graph)723 void GeGraphExecutor::BuildInputDataGeTensor(const KernelGraphPtr &kernel_graph) {
724 MS_LOG(INFO) << "Start BuildInputDataGeTensor, kernel graph: " << kernel_graph->ToString();
725 MS_EXCEPTION_IF_NULL(kernel_graph);
726 std::vector<GeTensor> ge_inputs;
727 std::vector<DeviceAddress *> device_addrs;
728 std::vector<std::pair<AnfNodeWeakPtr, size_t>> need_update_input;
729 std::vector<AnfNodeWeakPtr> ge_input_nodes;
730 auto ge_input_list = kernel_graph->user_data<transform::GEInputList>();
731 if (ge_input_list) {
732 ge_input_nodes = ge_input_list->ge_inputs;
733 }
734 for (const auto &node_wptr : ge_input_nodes) {
735 auto node = node_wptr.lock();
736 if (!node) {
737 MS_LOG(ERROR) << "Get node lock failed, kerne graph: " << kernel_graph->ToString();
738 continue;
739 }
740 auto name = node->fullname_with_scope();
741 MS_LOG(INFO) << "Build input ge tensor: " << name << ", kernel graph: " << kernel_graph->graph_id();
742 auto output_addr = AnfAlgo::GetMutableOutputAddr(node, 0, false);
743 (void)device_addrs.emplace_back(output_addr.get());
744 auto shapes = trans::GetRuntimePaddingShape(node, 0);
745 auto host_type = common::AnfAlgo::GetOutputInferDataType(node, 0);
746 auto ge_tensor_desc = transform::TransformUtil::GetGeTensorDesc(shapes, host_type, kOpFormat_DEFAULT);
747 MS_EXCEPTION_IF_NULL(ge_tensor_desc);
748 ge_tensor_desc->SetPlacement(::ge::kPlacementDevice);
749 GeTensor ge_tensor(*ge_tensor_desc);
750 if (output_addr->GetMutablePtr() != nullptr) {
751 MS_LOG(INFO) << "Node: " << name << " Has addr, size: " << output_addr->GetSize();
752 if (ge_tensor.SetData(reinterpret_cast<uint8_t *>(output_addr->GetMutablePtr()), output_addr->GetSize(),
753 [](void *) {}) != ::ge::GRAPH_SUCCESS) {
754 MS_LOG(EXCEPTION) << "SetData failed, ge input data " << ge_inputs.size() << " name: " << name
755 << " size: " << output_addr->GetSize();
756 }
757 MS_LOG(INFO) << "ge input data " << ge_inputs.size() << " name: " << name << " size: " << output_addr->GetSize();
758 }
759 // The device address of input tensor may change every step.
760 // Always keep the input node address consistent with the input tensor address.
761 (void)need_update_input.emplace_back(node, ge_inputs.size());
762 (void)ge_inputs.emplace_back(std::move(ge_tensor));
763 }
764 input_datas_[kernel_graph.get()] = {ge_inputs, device_addrs, need_update_input};
765 MS_LOG(INFO) << "BuildInputDataGeTensor finish.";
766 }
767
BuildOutputDataGeTensor(const KernelGraphPtr & kernel_graph)768 void GeGraphExecutor::BuildOutputDataGeTensor(const KernelGraphPtr &kernel_graph) {
769 MS_LOG(INFO) << "Start BuildOutputDataGeTensor, kernel graph: " << kernel_graph->ToString();
770 MS_EXCEPTION_IF_NULL(kernel_graph);
771 std::vector<GeTensor> ge_outputs;
772 std::vector<DeviceAddress *> device_addrs;
773 std::vector<std::pair<AnfNodeWeakPtr, size_t>> graph_outputs;
774 auto outputs = common::AnfAlgo::GetAllOutputWithIndex(kernel_graph->output());
775 for (const auto &output : outputs) {
776 const auto &output_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
777 auto &output_node = output_with_index.first;
778 auto index = output_with_index.second;
779 MS_EXCEPTION_IF_NULL(output_node);
780 if (HasAbstractMonad(output_node)) {
781 continue;
782 }
783 if (common::AnfAlgo::IsNoOuputNode(output_node)) {
784 continue;
785 }
786 auto real_index = output_node->isa<ValueNode>() ? 0 : index;
787 auto device_addr = AnfAlgo::GetMutableOutputAddr(output_node, real_index, false);
788 (void)device_addrs.emplace_back(device_addr.get());
789 auto shapes = trans::GetRuntimePaddingShape(output_node, real_index);
790 auto host_type = common::AnfAlgo::GetOutputInferDataType(output_node, real_index);
791 auto ge_tensor_desc = transform::TransformUtil::GetGeTensorDesc(shapes, host_type, kOpFormat_DEFAULT);
792 MS_EXCEPTION_IF_NULL(ge_tensor_desc);
793 ge_tensor_desc->SetPlacement(::ge::kPlacementDevice);
794 GeTensor ge_tensor(*ge_tensor_desc);
795 (void)ge_outputs.emplace_back(std::move(ge_tensor));
796 (void)graph_outputs.emplace_back(output_node, index);
797 }
798 MS_EXCEPTION_IF_CHECK_FAIL(
799 ge_outputs.size() == graph_outputs.size(),
800 "The size of ge_outputs and graph_outputs check error, kernel graph: " + kernel_graph->ToString());
801 output_datas_[kernel_graph.get()] = {ge_outputs, device_addrs, graph_outputs};
802 MS_LOG(INFO) << "BuildOutputDataGeTensor finish.";
803 }
804
CreateOutputDeviceAddress(const KernelGraphPtr & kernel_graph,const KernelWithIndex & output_with_index,size_t need_alloc_output_cnt) const805 DeviceAddressPtr GeGraphExecutor::CreateOutputDeviceAddress(const KernelGraphPtr &kernel_graph,
806 const KernelWithIndex &output_with_index,
807 size_t need_alloc_output_cnt) const {
808 MS_EXCEPTION_IF_NULL(kernel_graph);
809 auto output_node = output_with_index.first;
810 MS_EXCEPTION_IF_NULL(output_node);
811 auto ref_map = kernel_graph->GetRefMap();
812
813 auto ms_context = MsContext::GetInstance();
814 MS_EXCEPTION_IF_NULL(ms_context);
815 auto device_id = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID);
816 auto real_index = output_node->isa<ValueNode>() ? 0 : output_with_index.second;
817 TypeId output_type_id = common::AnfAlgo::GetOutputInferDataType(output_node, real_index);
818 size_t type_size = GetTypeByte(TypeIdToType(output_type_id));
819 auto shapes = trans::GetRuntimePaddingShape(output_node, real_index);
820 auto tensor_size =
821 shapes.empty() ? type_size : std::accumulate(shapes.begin(), shapes.end(), type_size, std::multiplies<size_t>());
822 // When ValueNode is a graph output, runtime does not manage this memory
823 // output in ref_map, mem same is input
824 bool need_not_alloc = (kernel_graph->has_flag(kFlagEnableZeroCopyInGraph) && !output_node->isa<ValueNode>()) ||
825 (ref_map.find(output_with_index) != ref_map.end());
826 void *mem = need_not_alloc ? nullptr : ResManager()->AllocateMemory(tensor_size);
827
828 if (common::IsNeedProfileMemory() && !need_not_alloc) {
829 MS_LOG(WARNING) << "Need Profile Memory, alloc type: ValueNodeOutput, size:" << tensor_size
830 << ", graph: " << kernel_graph->ToString() << ", node: " << output_node->fullname_with_scope()
831 << ", device address addr: " << mem;
832 }
833 if (!need_not_alloc) {
834 UpdateTracker("ValueNodeOutput", output_node->fullname_with_scope(), kernel_graph->ToString(), tensor_size, mem,
835 device::tracker::MemType::kConstantValue);
836 }
837
838 const auto kernel_tensor = AnfAlgo::CreateOutputKernelTensorWithDeviceInfo(
839 {output_node, real_index}, mem, tensor_size, kOpFormat_DEFAULT, output_type_id, {}, kAscendDevice, device_id);
840 auto output_device_addr = std::make_shared<AscendDeviceAddress>(kernel_tensor);
841 if (ref_map.find(output_with_index) != ref_map.end()) {
842 auto input_with_index = ref_map[output_with_index];
843 auto input_device_address = AnfAlgo::GetMutableOutputAddr(input_with_index.first, input_with_index.second, false);
844 MS_EXCEPTION_IF_NULL(input_device_address);
845 MS_LOG(INFO) << "The output node " << output_node->fullname_with_scope()
846 << " is in ref_map, set the same device_address ptr as the corresponding input, input node: "
847 << input_with_index.first->fullname_with_scope();
848 // Update the reference count of device address.
849 output_device_addr->set_pointer_ref_count(input_device_address->pointer_ref_count());
850 output_device_addr->IncreaseOriginalRefCount();
851 output_device_addr->ResetRefCount();
852 }
853 output_device_addr->set_device_synchronizer(std::make_shared<AscendDeviceSynchronizer>());
854 output_device_addr->set_is_ptr_persisted(true);
855 if (IsMemoryPoolRecycle() && need_alloc_output_cnt <= kNeedRecycleOutput) {
856 MS_LOG(INFO) << "Set Memory Pool Recycle, graph: " << kernel_graph->ToString()
857 << ", node: " << output_node->fullname_with_scope();
858 output_device_addr->set_from_persistent_mem(true);
859 output_device_addr->set_need_recycle(true);
860 }
861 return output_device_addr;
862 }
863
AllocOutputMemory(const KernelGraphPtr & kernel_graph) const864 void GeGraphExecutor::AllocOutputMemory(const KernelGraphPtr &kernel_graph) const {
865 MS_EXCEPTION_IF_NULL(kernel_graph);
866 MS_LOG(INFO) << "Start AllocOutputMemory, kernel graph: " << kernel_graph->ToString();
867
868 auto outputs = common::AnfAlgo::GetAllOutputWithIndex(kernel_graph->output());
869 auto ref_map = kernel_graph->GetRefMap();
870 size_t need_alloc_output_cnt = 0;
871 for (const auto &output : outputs) {
872 const auto &output_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
873 auto &output_node = output_with_index.first;
874 if (output_node->isa<Parameter>() || output_node->isa<ValueNode>()) {
875 continue;
876 }
877 if (ref_map.find(output_with_index) != ref_map.end()) {
878 continue;
879 }
880 need_alloc_output_cnt++;
881 }
882
883 for (const auto &output : outputs) {
884 const auto &output_with_index = common::AnfAlgo::FetchRealNodeSkipMonadControl(output);
885 auto &output_node = output_with_index.first;
886 MS_EXCEPTION_IF_NULL(output_node);
887 SetKernelInfo(output_node);
888
889 // Parameter's memory is allocated earlier, and there is no need to reallocate memory if Parameter is output.
890 if (AnfAlgo::OutputAddrExist(output_node, output_with_index.second, false) || output_node->isa<Parameter>()) {
891 MS_LOG(INFO) << "The device_address of output node " << output_node->fullname_with_scope()
892 << " is already exist, skip.";
893 continue;
894 }
895
896 auto output_device_addr = CreateOutputDeviceAddress(kernel_graph, output_with_index, need_alloc_output_cnt);
897 AnfAlgo::SetOutputAddr(output_device_addr, output_with_index.second, output_node.get());
898 MS_LOG(INFO) << "Output node info: (name " << output_node->fullname_with_scope() << ", "
899 << output_node->DebugString() << " ), output size: " << output_device_addr->GetSize()
900 << ", device_address: " << output_device_addr;
901 // When both the input and output of NopNode are used as outputs, different memory needs to be allocated for them.
902 }
903 MS_LOG(INFO) << "AllocOutputMemory finish.";
904 }
905
ResManager() const906 GeDeviceResManager *GeGraphExecutor::ResManager() const {
907 MS_EXCEPTION_IF_NULL(device_context_);
908 auto res_manager = dynamic_cast<GeDeviceResManager *>(device_context_->device_res_manager_.get());
909 MS_EXCEPTION_IF_NULL(res_manager);
910 return res_manager;
911 }
912
PreprocessBeforeRun(const KernelGraphPtr & graph)913 void GeGraphExecutor::PreprocessBeforeRun(const KernelGraphPtr &graph) {
914 auto ret = CompileGraph(graph, {});
915 if (!ret) {
916 MS_LOG(EXCEPTION) << "Compile graph fail, graph id: " << graph->graph_id();
917 }
918 }
919
BuildGraph(const KernelGraphPtr & graph,const transform::TensorOrderMap & tensor_order_map)920 bool GeGraphExecutor::BuildGraph(const KernelGraphPtr &graph, const transform::TensorOrderMap &tensor_order_map) {
921 std::set<KernelGraphPtr> memo;
922 GEGraphOptimization::GetInstance().OptimizeGEGraph(graph, &memo);
923 auto &compile_cache_context = CompileCacheContext::GetInstance();
924 auto use_compile_cache = compile_cache_context.UseCompileCache();
925 auto name = GetGraphName(graph);
926 bool has_cache = CacheFileExists(name);
927 if (use_compile_cache && has_cache) {
928 MS_LOG(INFO) << "Use ge compile cache, and skip specific optimization and ge_adapter execution";
929 if (!BuildFakeGraph(graph)) {
930 return false;
931 }
932 } else {
933 (void)BuildDFGraph(graph, tensor_order_map, false);
934 }
935 return true;
936 }
937
AllocMemory(const KernelGraphPtr & graph)938 void GeGraphExecutor::AllocMemory(const KernelGraphPtr &graph) {
939 AllocParameterMemory(graph);
940 AllocOutputMemory(graph);
941 EnableGraphInputZeroCopy(graph);
942 EnableGraphOutputZeroCopy(graph);
943 }
944
CompileGraph(const KernelGraphPtr & graph,const std::map<string,string> &)945 bool GeGraphExecutor::CompileGraph(const KernelGraphPtr &graph,
946 const std::map<string, string> & /* compile_options */) {
947 MS_EXCEPTION_IF_NULL(graph);
948 MS_LOG(INFO) << "ge graph executor compile graph " << graph->ToString();
949 auto &compile_cache_context = CompileCacheContext::GetInstance();
950 auto use_compile_cache = compile_cache_context.UseCompileCache();
951 std::map<std::string, ShapeVector> origin_shape;
952 const auto &tensor_order_map = GetDefaultParams(graph, &origin_shape);
953 auto name = GetGraphName(graph);
954 bool has_cache = CacheFileExists(name);
955 if (use_compile_cache && has_cache) {
956 MS_LOG(INFO) << "Use ge compile cache, and skip specific optimization and ge_adapter execution";
957 std::set<KernelGraphPtr> memo;
958 GEGraphOptimization::GetInstance().OptimizeGEGraph(graph, &memo);
959 if (!BuildFakeGraph(graph)) {
960 return false;
961 }
962 } else {
963 (void)BuildGraph(graph, tensor_order_map);
964 }
965 SetDynamicShapeAttr(graph);
966 transform::RunOptions run_options;
967 run_options.name = GetGraphName(graph);
968 auto graph_runner = transform::GetGraphRunner();
969 if (graph_runner == nullptr) {
970 MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
971 }
972 // create loop var
973 RunInitGraph(run_options.name);
974 if (graph->is_dynamic_shape()) {
975 // Release GIL before calling into (potentially long-running) C++ code
976 GilReleaseWithCheck gil_release;
977 auto ret = graph_runner->CompileGraph(run_options);
978 if (ret != transform::Status::SUCCESS) {
979 MS_LOG(EXCEPTION) << "Compile graph " << run_options.name << " failed.";
980 }
981 } else {
982 ::ge::CompiledGraphSummaryPtr ge_graph_summary = nullptr;
983 {
984 // Release GIL before calling into (potentially long-running) C++ code
985 GilReleaseWithCheck gil_release;
986 auto ret = graph_runner->CompileGraph(run_options, &ge_graph_summary);
987 if (ret != transform::Status::SUCCESS) {
988 MS_LOG(EXCEPTION) << "Compile graph " << run_options.name << " failed.";
989 }
990 }
991 GraphSummary summary(ge_graph_summary);
992 MS_LOG(INFO) << "Graph " << run_options.name << " summary: " << summary.ToString();
993 feature_memorys[run_options.name] = summary.feature_memory_size;
994 streams[run_options.name] = summary.stream_num;
995 AllocConstMemory(run_options, graph, summary.const_memory_size);
996 AllocFeatureMemory(run_options, summary.feature_memory_size);
997 AddRefCorrespondPairs(graph, summary.io_indexes);
998 }
999 AllocMemory(graph);
1000
1001 graph->set_run_mode(RunMode::kGraphMode);
1002 graph->set_memory_managed_by_ge(true);
1003 if (ConfigManager::GetInstance().dataset_mode() == DatasetMode::DS_SINK_MODE) {
1004 graph->set_is_loop_count_sink(true);
1005 }
1006 RevertOriginShape(graph, origin_shape);
1007 return true;
1008 }
1009
AddRefCorrespondPairs(const KernelGraphPtr & graph,const std::vector<std::pair<uint32_t,uint32_t>> & io_indexes) const1010 void GeGraphExecutor::AddRefCorrespondPairs(const KernelGraphPtr &graph,
1011 const std::vector<std::pair<uint32_t, uint32_t>> &io_indexes) const {
1012 MS_LOG(INFO) << "Start convert io_indexes to ref_map, kernel graph: " << graph->ToString();
1013 MS_EXCEPTION_IF_NULL(graph);
1014
1015 std::map<session::AnfWithOutIndex, session::AnfWithOutIndex> ref_out_in_map = {};
1016 auto graph_inputs_all = graph->parameters();
1017 std::vector<AnfNodePtr> graph_inputs = {};
1018 for (auto &node : graph_inputs_all) {
1019 MS_EXCEPTION_IF_NULL(node);
1020 auto abs = node->abstract();
1021 MS_EXCEPTION_IF_NULL(abs);
1022 if (HasAbstractMonad(node) || abs->isa<abstract::AbstractSequence>()) {
1023 MS_LOG(INFO) << "Input node: " << node->DebugString() << " is a monad or tuple/list parameter, skip.";
1024 continue;
1025 }
1026 graph_inputs.emplace_back(node);
1027 }
1028
1029 std::vector<common::KernelWithIndex> graph_outputs_all = {};
1030 common::AnfAlgo::GetRealInputs(graph->get_return(), &graph_outputs_all);
1031 std::vector<common::KernelWithIndex> graph_outputs = {};
1032
1033 for (auto &node_with_index : graph_outputs_all) {
1034 if (common::AnfAlgo::IsNoOuputNode(node_with_index.first) || HasAbstractMonad(node_with_index.first)) {
1035 MS_LOG(INFO) << "Output node: " << node_with_index.first->fullname_with_scope()
1036 << " is a no output node or monad node, skip.";
1037 continue;
1038 }
1039
1040 graph_outputs.emplace_back(node_with_index);
1041 }
1042
1043 for (auto in_out_index : io_indexes) {
1044 if (in_out_index.first >= graph_inputs.size() || in_out_index.second >= graph_outputs.size()) {
1045 MS_LOG(EXCEPTION) << "The io_indexes out of range, input index: " << in_out_index.first
1046 << ", output index: " << in_out_index.second << ", graph input size: " << graph_inputs.size()
1047 << ", graph output size: " << graph_outputs.size();
1048 }
1049 session::AnfWithOutIndex origin_node = std::make_pair(graph_inputs[in_out_index.first], 0);
1050 session::AnfWithOutIndex final_node = graph_outputs[in_out_index.second];
1051 if (origin_node.first == final_node.first) {
1052 MS_LOG(INFO) << "The origin node is same as final node, node: " << origin_node.first->fullname_with_scope();
1053 continue;
1054 }
1055 if (ref_out_in_map.count(final_node) != 0) {
1056 MS_LOG(INFO) << "The node is already in ref_out_in_map, node: " << final_node.first->fullname_with_scope()
1057 << ", index: " << final_node.second;
1058 continue;
1059 }
1060 // if input node is not abstract ref, set ref may cause memory reuse error
1061 auto abs = origin_node.first->abstract();
1062 if (!abs->isa<abstract::AbstractRefTensor>()) {
1063 MS_LOG(INFO) << "The node is not abstract tensor: " << final_node.first->fullname_with_scope()
1064 << ", index: " << final_node.second;
1065 continue;
1066 }
1067
1068 ref_out_in_map.emplace(final_node, origin_node);
1069 MS_LOG(INFO) << "Convert io_index [" << in_out_index.first << ", " << in_out_index.second
1070 << "] to ref_out_in_map, final_node: " << final_node.first->fullname_with_scope()
1071 << ", index:" << final_node.second << ", origin_node: " << origin_node.first->fullname_with_scope()
1072 << ", index: " << origin_node.second;
1073 }
1074
1075 graph->set_ref_out_in_map(ref_out_in_map);
1076 }
1077
CompileGraph(const FuncGraphPtr & graph,const std::map<string,string> & compile_options)1078 bool GeGraphExecutor::CompileGraph(const FuncGraphPtr &graph, const std::map<string, string> &compile_options) {
1079 MS_EXCEPTION_IF_NULL(graph);
1080
1081 auto graph_name = GetGraphName(graph);
1082 profiler::CollectHostInfo("Ascend", "CompileGraph", "GeCompileGraph_" + graph_name, 1, 0, kCollectHostInfoStart);
1083
1084 // cppcheck-suppress unreadVariable
1085 ContextReset reset_context(device_context_);
1086 KernelGraphPtr kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1087 MS_EXCEPTION_IF_NULL(kg);
1088 if (IsEnableRefMode()) {
1089 auto ret = CompileGraph(kg, compile_options);
1090 profiler::CollectHostInfo("Ascend", "CompileGraph", "GeCompileGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1091 return ret;
1092 } else {
1093 // delete SetCPUMemManager when delete env MS_DISABLE_REF_MODE
1094 ResManager()->SetCPUMemManager();
1095 std::map<std::string, ShapeVector> origin_shape;
1096 const auto &tensor_order_map = GetDefaultParams(graph, &origin_shape);
1097 auto &compile_cache_context = CompileCacheContext::GetInstance();
1098 auto use_compile_cache = compile_cache_context.UseCompileCache();
1099 if (use_compile_cache) {
1100 MS_LOG(INFO) << "Use ge compile cache, and skip specific optimization and ge_adapter execution";
1101 std::set<KernelGraphPtr> memo;
1102 GEGraphOptimization::GetInstance().OptimizeGEGraph(kg, &memo);
1103 if (!BuildFakeGraph(kg)) {
1104 profiler::CollectHostInfo("Ascend", "CompileGraph", "GeCompileGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1105 return false;
1106 }
1107 } else {
1108 (void)BuildGraph(kg, tensor_order_map);
1109 }
1110 SetDynamicShapeAttr(kg);
1111 AllocInputHostMemory(kg);
1112 AllocOutputHostMemory(kg);
1113 kg->set_run_mode(RunMode::kGraphMode);
1114 if (ConfigManager::GetInstance().dataset_mode() == DatasetMode::DS_SINK_MODE) {
1115 kg->set_is_loop_count_sink(true);
1116 }
1117 // copy init weight to device
1118 RunGEInitGraph(kg);
1119 RevertOriginShape(kg, origin_shape);
1120 profiler::CollectHostInfo("Ascend", "CompileGraph", "GeCompileGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1121 return true;
1122 }
1123 }
1124
SetOutputs(const std::vector<KernelWithIndex> & graph_outputs,const std::vector<transform::GeTensorPtr> & ge_outputs,const std::vector<TypeId> & me_types)1125 void SetOutputs(const std::vector<KernelWithIndex> &graph_outputs,
1126 const std::vector<transform::GeTensorPtr> &ge_outputs, const std::vector<TypeId> &me_types) {
1127 for (size_t i = 0; i < graph_outputs.size(); ++i) {
1128 const auto &[output_node, idx] = common::AnfAlgo::FetchRealNodeSkipMonadControl(graph_outputs[i]);
1129 const auto &tensor = ge_outputs[i];
1130 auto output_addr = AnfAlgo::GetMutableOutputAddr(output_node, idx);
1131 ::ge::Placement dp = tensor->GetTensorDesc().GetPlacement();
1132 auto &&ge_data_uni = tensor->ResetData();
1133 auto deleter = ge_data_uni.get_deleter();
1134 auto ge_data = ge_data_uni.release();
1135 MS_EXCEPTION_IF_NULL(ge_data);
1136 if (dp == ::ge::kPlacementHost) {
1137 constexpr int64_t kTensorAlignBytes = 64;
1138 if (reinterpret_cast<uintptr_t>(ge_data) % kTensorAlignBytes != 0) {
1139 MS_LOG(EXCEPTION) << "Skip zero-copy ge tensor " << reinterpret_cast<uintptr_t>(ge_data)
1140 << ", bytes not aligned with expected.";
1141 }
1142 if (me_types[i] == TypeId::kObjectTypeString) {
1143 MS_LOG(EXCEPTION) << "It is not supported that Output node " << output_node->DebugString()
1144 << "'s output data type is string now.";
1145 }
1146 MS_LOG(DEBUG) << "Zero-copy ge tensor " << reinterpret_cast<uintptr_t>(ge_data) << " as aligned with "
1147 << kTensorAlignBytes << " types.";
1148 output_addr->set_is_ptr_persisted(false);
1149 output_addr->set_from_mem_pool(false);
1150 output_addr->set_deleter(deleter);
1151 output_addr->set_ptr(ge_data);
1152 output_addr->SetSize(tensor->GetSize());
1153 } else {
1154 MS_LOG(EXCEPTION) << "It is not supported that Output node " << output_node->DebugString()
1155 << "'s output data's placement is device now.";
1156 }
1157 auto actual_shapes = tensor->GetTensorDesc().GetShape().GetDims();
1158 UpdateOutputNodeShape(output_node, idx, me_types[i], actual_shapes);
1159 }
1160 }
1161
SetOutput(GeDeviceResManager * res_manager,GeTensor * ge_output,const AnfNodePtr & output_node,size_t idx)1162 void SetOutput(GeDeviceResManager *res_manager, GeTensor *ge_output, const AnfNodePtr &output_node, size_t idx) {
1163 if (output_node->isa<ValueNode>()) {
1164 auto &&ge_data_uni = ge_output->ResetData();
1165 auto deleter = ge_data_uni.get_deleter();
1166 auto ge_data = ge_data_uni.release();
1167 deleter(ge_data);
1168 return;
1169 }
1170 auto actual_shapes = ge_output->GetTensorDesc().GetShape().GetDims();
1171 for (size_t i = 0; i < actual_shapes.size(); ++i) {
1172 if (actual_shapes[i] < 0) {
1173 MS_LOG(EXCEPTION) << "Output shape must be greater than 0, but got " << actual_shapes;
1174 }
1175 }
1176 auto output_addr = AnfAlgo::GetMutableOutputAddr(output_node, idx, false);
1177 output_addr->SetSize(ge_output->GetSize());
1178 auto &&ge_data_uni = ge_output->ResetData();
1179 auto deleter = ge_data_uni.get_deleter();
1180 auto ge_data = ge_data_uni.release();
1181 MS_EXCEPTION_IF_NULL(ge_data);
1182 output_addr->set_is_ptr_persisted(false);
1183 output_addr->set_from_mem_pool(false);
1184 output_addr->set_deleter(deleter);
1185 output_addr->set_ptr(ge_data);
1186 auto placement = ge_output->GetTensorDesc().GetPlacement();
1187 if (placement == ::ge::kPlacementHost) {
1188 MS_LOG(DEBUG) << output_node->DebugString() << "'s output data's placement is host";
1189 size_t size = ge_output->GetSize();
1190 void *mem = res_manager->AllocateMemory(size);
1191 if (mem == nullptr) {
1192 MS_LOG(EXCEPTION) << "Allocate memory failed, memory size:" << size
1193 << ", output_node: " << output_node->ToString();
1194 }
1195 output_addr->set_from_mem_pool(true);
1196 output_addr->set_ptr(mem);
1197 auto *ascend_addr = dynamic_cast<AscendDeviceAddress *>(output_addr.get());
1198 MS_EXCEPTION_IF_NULL(ascend_addr);
1199 ascend_addr->SyncHostToDevice(size, ge_data);
1200 }
1201 // Update shape in kernel tensor.
1202 const auto &kernel_tensor = AnfAlgo::GetOutputKernelTensor(output_node, idx);
1203 MS_EXCEPTION_IF_NULL(kernel_tensor);
1204 kernel_tensor->SetShapeVector(actual_shapes);
1205 MS_LOG(INFO) << "[ZeroCopy] Update output " << output_node->DebugString() << " address to "
1206 << output_addr->GetMutablePtr() << ", shape:" << actual_shapes
1207 << ", type: " << TypeIdToString(output_addr->type_id()) << ", format: " << output_addr->format();
1208 }
1209
SetDynamicOutputs(const std::vector<KernelWithIndex> & graph_outputs,std::vector<GeTensor> * ge_outputs,GeDeviceResManager * res_manager)1210 void SetDynamicOutputs(const std::vector<KernelWithIndex> &graph_outputs, std::vector<GeTensor> *ge_outputs,
1211 GeDeviceResManager *res_manager) {
1212 MS_EXCEPTION_IF_NULL(res_manager);
1213 size_t ge_outputs_index = 0;
1214 size_t ge_outputs_size = ge_outputs->size();
1215 for (size_t i = 0; i < graph_outputs.size(); ++i) {
1216 const auto &[output_node, idx] = common::AnfAlgo::FetchRealNodeSkipMonadControl(graph_outputs[i]);
1217 MS_EXCEPTION_IF_NULL(output_node);
1218 if (HasAbstractMonad(output_node)) {
1219 continue;
1220 }
1221 if (common::AnfAlgo::IsNoOuputNode(output_node)) {
1222 continue;
1223 }
1224 if (ge_outputs_index >= ge_outputs_size) {
1225 MS_LOG(EXCEPTION) << "GE data access is out of bounds, which the current index value is " << ge_outputs_index
1226 << ", the total number of GE output is " << ge_outputs_size << ".";
1227 }
1228 SetOutput(res_manager, &((*ge_outputs)[ge_outputs_index++]), output_node, idx);
1229 }
1230 }
1231
GetGraphFeatureMemory(const FuncGraphPtr & graph) const1232 size_t GeGraphExecutor::GetGraphFeatureMemory(const FuncGraphPtr &graph) const {
1233 MS_EXCEPTION_IF_NULL(graph);
1234 auto graph_name = GetGraphName(graph);
1235 auto iter = feature_memorys.find(graph_name);
1236 if (iter == feature_memorys.end()) {
1237 MS_LOG(EXCEPTION) << "Graph " << graph_name << " feature memory not found.";
1238 }
1239 auto stream_iter = streams.find(graph_name);
1240 if (stream_iter == streams.end()) {
1241 MS_LOG(EXCEPTION) << "Graph " << graph_name << " stream not found.";
1242 }
1243 MS_LOG(WARNING) << "Need Profile Memory, graph: " << graph_name << ", stream: " << stream_iter->second;
1244 auto max_static_memory_size = ResManager()->GetMaxUsedMemorySize();
1245 auto feature_memory_size = iter->second;
1246 auto total_memory_size = max_static_memory_size + feature_memory_size;
1247 AscendMemAdapter::GetInstance().UpdateActualPeakMemory(total_memory_size);
1248 UpdateFMTracker(feature_memory_size, graph_name);
1249 return feature_memory_size;
1250 }
CurGraphSinkSize(std::string graph_name)1251 int64_t GeGraphExecutor::CurGraphSinkSize(std::string graph_name) {
1252 int64_t sink_size = -1;
1253 auto result = graph_sink_size_.find(graph_name);
1254 if (result != graph_sink_size_.end()) {
1255 sink_size = result->second;
1256 } else {
1257 auto ms_context = MsContext::GetInstance();
1258 MS_EXCEPTION_IF_NULL(ms_context);
1259 if (ConfigManager::GetInstance().dataset_mode() == DS_SINK_MODE &&
1260 ms_context->get_param<bool>(MS_CTX_ENABLE_LOOP_SINK)) {
1261 sink_size = ConfigManager::GetInstance().iter_num();
1262 }
1263 MS_LOG(INFO) << "Graph [" << graph_name << "] sink size is " << sink_size;
1264 graph_sink_size_.insert(std::pair(graph_name, sink_size));
1265 }
1266 return sink_size;
1267 }
1268
RunGraphRefMode(const FuncGraphPtr & graph,const std::vector<tensor::Tensor> & inputs)1269 bool GeGraphExecutor::RunGraphRefMode(const FuncGraphPtr &graph, const std::vector<tensor::Tensor> &inputs) {
1270 MS_EXCEPTION_IF_NULL(graph);
1271 auto graph_name = GetGraphName(graph);
1272 RunInitGraph(graph_name);
1273 MS_LOG(INFO) << "GE run graph start in ref mode, graph: " << graph_name << ".";
1274 (void)ResManager()->BindDeviceToCurrentThread(false);
1275
1276 // call ge rungraph
1277 KernelGraphPtr kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1278 transform::RunOptions run_options;
1279 run_options.name = graph_name;
1280 auto graph_runner = transform::GetGraphRunner();
1281 if (graph_runner == nullptr) {
1282 MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
1283 }
1284
1285 std::vector<GeTensor> ge_inputs = GenerateInputGeTensor(kg);
1286 std::vector<GeTensor> ge_outputs = GenerateOutputGeTensor(kg);
1287
1288 bool is_dynamic_shape = kg->is_dynamic_shape();
1289 if (IsMemoryPoolRecycle() && !is_dynamic_shape) {
1290 auto max_static_memory_size = ResManager()->GetMaxUsedMemorySize();
1291 auto iter = feature_memorys.find(graph_name);
1292 if (iter == feature_memorys.end()) {
1293 MS_LOG(EXCEPTION) << "Graph " << graph_name << " feature memory not found.";
1294 }
1295 auto feature_memory_size = iter->second;
1296 if (feature_memory_size != 0) {
1297 size_t total_memory_size = max_static_memory_size + feature_memory_size;
1298 size_t max_hbm_memory_size = static_cast<size_t>(AscendMemAdapter::GetInstance().GetMsUsedHbmSize());
1299 AscendMemAdapter::GetInstance().UpdateActualPeakMemory(total_memory_size);
1300 UpdateFMTracker(feature_memory_size, graph_name);
1301 if (common::IsNeedMemoryStatistic()) {
1302 MS_LOG(WARNING) << "Now Memory Status, graph: " << graph_name
1303 << ", max_static_memory_size: " << max_static_memory_size
1304 << ", feature_memory_size: " << feature_memory_size
1305 << ", max_hbm_memory_size: " << max_hbm_memory_size;
1306 }
1307 if (total_memory_size > max_hbm_memory_size) {
1308 MS_LOG(EXCEPTION) << "Memory pool not enough, graph: " << graph_name
1309 << ", max_static_memory_size: " << max_static_memory_size
1310 << ", feature_memory_size: " << feature_memory_size
1311 << ", max_hbm_memory_size: " << max_hbm_memory_size;
1312 }
1313 }
1314 }
1315
1316 {
1317 // Release GIL before calling into (potentially long-running) C++ code
1318 GilReleaseWithCheck gil_release;
1319 MS_LOG(INFO) << "Run graph begin, inputs size is: " << inputs.size() << ", " << graph_name;
1320 if (IsNeedNotifyTTP(graph)) {
1321 MS_LOG(INFO) << "Found optimizer sub graph and send event to mindio";
1322 auto sync_ret = ResManager()->SyncStream();
1323 if (!sync_ret) {
1324 MS_LOG(EXCEPTION) << "Sync stream failed";
1325 } else {
1326 mindio::MindIOAdapter::GetInstance()->NotifyStartUpdatingOs();
1327 }
1328 }
1329 transform::Status ret =
1330 transform::RunGraphWithStreamAsync(graph_runner, run_options, ResManager()->GetStream(), ge_inputs, &ge_outputs);
1331 if (ret != transform::Status::SUCCESS) {
1332 MS_LOG(EXCEPTION) << "Exec graph failed";
1333 }
1334 }
1335 if (is_dynamic_shape) {
1336 auto graph_outputs = common::AnfAlgo::GetAllOutputWithIndex(graph->output());
1337 SetDynamicOutputs(graph_outputs, &ge_outputs, ResManager());
1338 auto sync_ret = ResManager()->SyncStream();
1339 if (!sync_ret) {
1340 MS_LOG(EXCEPTION) << "Sync stream failed";
1341 }
1342 }
1343 ClearForwardOutputAddress(kg, device_context_);
1344 return true;
1345 }
1346
DoAsyncCkpt(const FuncGraphPtr & graph)1347 void GeGraphExecutor::DoAsyncCkpt(const FuncGraphPtr &graph) {
1348 MS_EXCEPTION_IF_NULL(graph);
1349 auto kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1350 auto ms_context = MsContext::GetInstance();
1351 MS_EXCEPTION_IF_NULL(ms_context);
1352 auto env = common::GetEnv("MS_ENABLE_CKPT_D2H_ASYNC");
1353 if (env == "1" && ms_context->get_param<bool>(MS_CTX_NEED_CKPT) && kg != nullptr) {
1354 auto cur_step = ms_context->get_param<int>(MS_CTX_CUR_STEP_NUM);
1355 auto save_steps = ms_context->get_param<int>(MS_CTX_SAVE_CKPT_STEPS);
1356 auto last_triggered_step = ms_context->get_param<int>(MS_CTX_LAST_TRIGGERED_STEP);
1357 MS_LOG(DEBUG) << "cur_step:" << cur_step << ", save_steps: " << save_steps
1358 << ", last_triggered_step:" << last_triggered_step;
1359 if (cur_step >= (last_triggered_step + save_steps)) {
1360 if (SkipOrResetCopyAction()) {
1361 MS_LOG(INFO) << "Enable async d2h copy";
1362 SavePrevStepWeight(kg->GetRootWeights(), ResManager()->GetCopyDataStream());
1363 }
1364 if (kg->has_attr(kIsRefGraph) && GetValue<bool>(kg->get_attr(kIsRefGraph)) && SkipOrResetSyncAction()) {
1365 MS_LOG(INFO) << "Ref graph sync once action";
1366 SyncCopyStream(ResManager()->GetCopyDataStream());
1367 }
1368 }
1369 }
1370 }
1371
IsNeedNotifyTTP(const FuncGraphPtr & graph)1372 bool GeGraphExecutor::IsNeedNotifyTTP(const FuncGraphPtr &graph) {
1373 MS_EXCEPTION_IF_NULL(graph);
1374 auto kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1375 if (mindio::MindIOAdapter::GetInstance()->IsEnable() && kg != nullptr && kg->has_attr(kIsRefGraph) &&
1376 GetValue<bool>(kg->get_attr(kIsRefGraph))) {
1377 return true;
1378 }
1379 return false;
1380 }
1381
RunGraph(const FuncGraphPtr & graph,const std::vector<tensor::Tensor> & inputs,std::vector<tensor::Tensor> * outputs,const std::map<string,string> &)1382 bool GeGraphExecutor::RunGraph(const FuncGraphPtr &graph, const std::vector<tensor::Tensor> &inputs,
1383 std::vector<tensor::Tensor> *outputs,
1384 const std::map<string, string> & /* compile_options */) {
1385 MS_EXCEPTION_IF_NULL(graph);
1386 auto graph_name = GetGraphName(graph);
1387 profiler::CollectHostInfo("Ascend", "RunGraph", "GeRunGraph_" + graph_name, 1, 0, kCollectHostInfoStart);
1388 DoAsyncCkpt(graph);
1389 if (IsEnableRefMode()) {
1390 if (!RunGraphRefMode(graph, inputs)) {
1391 profiler::CollectHostInfo("Ascend", "RunGraph", "GeRunGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1392 return false;
1393 }
1394 } else {
1395 MS_LOG(INFO) << "GE run graph start, graph: " << graph_name << ".";
1396 (void)ResManager()->BindDeviceToCurrentThread(false);
1397 // copy input from device to host
1398 const auto &cur_inputs = graph->get_inputs();
1399 std::vector<tensor::TensorPtr> input_tensors;
1400 for (const auto &input : cur_inputs) {
1401 MS_EXCEPTION_IF_NULL(input);
1402 auto output_addr = AnfAlgo::GetMutableOutputAddr(input, 0);
1403 auto shapes = trans::GetRuntimePaddingShape(input, 0);
1404 auto host_type = common::AnfAlgo::GetOutputInferDataType(input, 0);
1405 auto tensor = std::make_shared<tensor::Tensor>(host_type, shapes);
1406 MS_EXCEPTION_IF_NULL(tensor);
1407 tensor->set_device_address(output_addr, false);
1408 tensor->data_sync();
1409 (void)input_tensors.emplace_back(std::move(tensor));
1410 }
1411 auto ge_inputs = transform::ConvertInputTensors(input_tensors, kOpFormat_NCHW);
1412
1413 // call ge rungraph
1414 KernelGraphPtr kg = std::dynamic_pointer_cast<session::KernelGraph>(graph);
1415 if (kg != nullptr) {
1416 graph_name = kg->GetFuncGraph()->ToString();
1417 }
1418 transform::RunOptions run_options;
1419 run_options.name = graph_name;
1420 auto graph_runner = transform::GetGraphRunner();
1421 if (graph_runner == nullptr) {
1422 MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
1423 }
1424
1425 AnfNodePtr output = graph->get_return()->input(1);
1426 MS_EXCEPTION_IF_NULL(output);
1427 std::vector<TypeId> me_types;
1428 auto output_c = output->cast<CNodePtr>()->abstract();
1429 // get output node data types
1430 GetMeRetDataType(output_c, &me_types);
1431 std::vector<transform::GeTensorPtr> ge_outputs;
1432 {
1433 // Release GIL before calling into (potentially long-running) C++ code
1434 GilReleaseWithCheck gil_release;
1435 MS_LOG(DEBUG) << "Run graph begin, inputs size is: " << inputs.size();
1436 transform::Status ret = transform::RunGraphAsync(graph_runner, run_options, ge_inputs, &ge_outputs);
1437 MS_LOG(DEBUG) << "Run graph finish, outputs size is: " << ge_outputs.size();
1438 if (ret == transform::Status::NOT_FOUND) {
1439 MS_LOG(WARNING) << "The Graph[" << graph_name << "] is not found, skip run it.";
1440 profiler::CollectHostInfo("Ascend", "RunGraph", "GeRunGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1441 return true;
1442 } else if (ret != transform::Status::SUCCESS) {
1443 MS_LOG(EXCEPTION) << "Exec graph failed";
1444 }
1445 }
1446 auto no_output = common::AnfAlgo::IsNoOuputNode(output);
1447 if (!no_output) {
1448 if (me_types.size() != ge_outputs.size()) {
1449 MS_LOG(EXCEPTION) << "Invalid output size, me_type's size " << me_types.size() << " tensor size "
1450 << ge_outputs.size();
1451 }
1452 // copy output from host to device
1453 auto graph_outputs = common::AnfAlgo::GetAllOutputWithIndex(graph->output());
1454 if (graph_outputs.size() != ge_outputs.size()) {
1455 MS_LOG(EXCEPTION) << "Invalid output size, graph's size " << graph_outputs.size() << " tensor size "
1456 << ge_outputs.size();
1457 }
1458 SetOutputs(graph_outputs, ge_outputs, me_types);
1459 }
1460 }
1461 if (graph->has_flag(transform::kGraphFlagHasGetNext)) {
1462 MS_LOG(DEBUG) << "Reset ConfigManager, graph: " << graph_name;
1463 ConfigManager::GetInstance().ResetConfig();
1464 ConfigManager::GetInstance().ResetIterNum();
1465 }
1466 profiler::CollectHostInfo("Ascend", "RunGraph", "GeRunGraph_" + graph_name, 1, 0, kCollectHostInfoEnd);
1467 MS_LOG(INFO) << "GE run graph end.";
1468 return true;
1469 }
1470
BuildDFGraph(const FuncGraphPtr & anf_graph,const transform::TensorOrderMap & init_inputs_map,bool export_air)1471 FuncGraphPtr GeGraphExecutor::BuildDFGraph(const FuncGraphPtr &anf_graph,
1472 const transform::TensorOrderMap &init_inputs_map, bool export_air) {
1473 MS_EXCEPTION_IF_NULL(anf_graph);
1474 #ifdef ENABLE_DUMP_IR
1475 auto context = MsContext::GetInstance();
1476 MS_EXCEPTION_IF_NULL(context);
1477 if (context->CanDump(kIntroductory)) {
1478 if (context->CanDump(kFully)) {
1479 draw::Draw("anf_graph_before_build_df_graph.dot", anf_graph); // for debug
1480 }
1481 DumpIR("anf_graph_before_build_df_graph.ir", anf_graph, true, kWholeStack);
1482 }
1483 #endif
1484
1485 if (!AddDFGraph(anf_graph, init_inputs_map, export_air)) {
1486 MS_LOG(ERROR) << "GenConvertor failed";
1487 return nullptr;
1488 }
1489
1490 #ifdef ENABLE_DUMP_IR
1491 if (context->CanDump(kIntroductory)) {
1492 if (context->CanDump(kFully)) {
1493 draw::Draw("anf_graph_after_build_df_graph.dot", anf_graph); // for debug
1494 }
1495 DumpIR("anf_graph_after_build_df_graph.ir", anf_graph, true, kWholeStack);
1496 }
1497 #endif
1498
1499 if (export_air) {
1500 // export air can't use session->AddGraph, it will cause atc error.
1501 return anf_graph;
1502 }
1503
1504 return anf_graph;
1505 }
1506
GetNodeInfo(const AnfNodeWeakPtr & node)1507 static inline std::string GetNodeInfo(const AnfNodeWeakPtr &node) {
1508 auto input_node = node.lock();
1509 MS_EXCEPTION_IF_NULL(input_node);
1510 return input_node->DebugString();
1511 }
1512
GenerateInputGeTensor(const KernelGraphPtr & kernel_graph) const1513 std::vector<GeTensor> GeGraphExecutor::GenerateInputGeTensor(const KernelGraphPtr &kernel_graph) const {
1514 MS_EXCEPTION_IF_NULL(kernel_graph);
1515 std::vector<GeTensor> ge_inputs;
1516 auto iter = input_datas_.find(kernel_graph.get());
1517 if (iter == input_datas_.end()) {
1518 return ge_inputs;
1519 }
1520 bool is_dynamic_shape = kernel_graph->is_dynamic_shape();
1521 const auto &input_datas = iter->second.ge_inputs;
1522 ge_inputs = input_datas;
1523 for (size_t i = 0; i < iter->second.device_addrs.size(); ++i) {
1524 auto output_addr = iter->second.device_addrs[i];
1525 MS_EXCEPTION_IF_NULL(output_addr);
1526 if (is_dynamic_shape) {
1527 auto ge_tensor_desc = transform::TransformUtil::GetGeTensorDesc(output_addr->kernel_tensor()->GetShapeVector(),
1528 output_addr->type_id(), output_addr->format());
1529 MS_EXCEPTION_IF_NULL(ge_tensor_desc);
1530 ge_tensor_desc->SetPlacement(::ge::kPlacementDevice);
1531 (void)ge_inputs[i].SetTensorDesc(*ge_tensor_desc);
1532 }
1533 auto node_output_addr = output_addr->GetMutablePtr();
1534 if (node_output_addr == nullptr) {
1535 auto input_node = iter->second.need_update_input[i].first.lock();
1536 MS_EXCEPTION_IF_NULL(input_node);
1537 // alloc static memory for unused inputs
1538 // error in ge when set nullptr into ge tensor
1539 std::vector<size_t> shape = Convert2SizeT(common::AnfAlgo::GetOutputInferShape(input_node, 0));
1540 size_t type_size = GetTypeByte(TypeIdToType(common::AnfAlgo::GetOutputInferDataType(input_node, 0)));
1541 size_t memory_size = std::accumulate(shape.begin(), shape.end(), type_size, std::multiplies<size_t>{});
1542 MS_EXCEPTION_IF_NULL(ResManager());
1543 auto memory = ResManager()->AllocateMemory(memory_size);
1544 output_addr->set_ptr(memory);
1545 output_addr->SetSize(memory_size);
1546 if (common::IsNeedProfileMemory()) {
1547 MS_LOG(WARNING) << "Need Profile Memory, alloc type: UnusedInput, size:" << memory_size
1548 << ", graph: " << kernel_graph->ToString() << ", node: " << input_node->fullname_with_scope()
1549 << ", device address addr: " << memory;
1550 }
1551 UpdateTracker("UnusedInput", input_node->fullname_with_scope(), kernel_graph->ToString(), memory_size, memory,
1552 device::tracker::MemType::kOther);
1553 }
1554 MS_LOG(INFO) << "[ZeroCopy] For Graph " << kernel_graph->ToString() << ", update input "
1555 << GetNodeInfo(iter->second.need_update_input[i].first) << " address to "
1556 << output_addr->GetMutablePtr() << ", shape:" << output_addr->kernel_tensor()->GetShapeVector()
1557 << ", type: " << TypeIdToString(output_addr->type_id()) << ", format: " << output_addr->format()
1558 << ", memory size: " << output_addr->GetSize();
1559 if (node_output_addr != ge_inputs[i].GetData() || output_addr->GetSize() != ge_inputs[i].GetSize()) {
1560 (void)ge_inputs[i].SetData(static_cast<uint8_t *>(node_output_addr), output_addr->GetSize(), [](void *) {});
1561 }
1562 }
1563 return ge_inputs;
1564 }
1565
GenerateOutputGeTensor(const KernelGraphPtr & kernel_graph) const1566 std::vector<GeTensor> GeGraphExecutor::GenerateOutputGeTensor(const KernelGraphPtr &kernel_graph) const {
1567 MS_EXCEPTION_IF_NULL(kernel_graph);
1568 std::vector<GeTensor> ge_outputs;
1569 auto iter = output_datas_.find(kernel_graph.get());
1570 if (iter == output_datas_.end()) {
1571 return ge_outputs;
1572 }
1573 const auto &output_datas = iter->second.ge_outputs;
1574 ge_outputs = output_datas;
1575
1576 bool is_dynamic_shape = kernel_graph->is_dynamic_shape();
1577 for (size_t idx = 0; idx < iter->second.device_addrs.size(); ++idx) {
1578 if (is_dynamic_shape) {
1579 ge_outputs[idx].SetData(nullptr, 0U, [](void *) {});
1580 continue;
1581 }
1582 auto output_node = iter->second.graph_outputs[idx].first.lock();
1583 auto index = iter->second.graph_outputs[idx].second;
1584 MS_EXCEPTION_IF_NULL(output_node);
1585 MS_EXCEPTION_IF_CHECK_FAIL(
1586 idx < ge_outputs.size(),
1587 "GenerateOutputGeTensor idx is greater equal than ge_outputs size, idx: " + std::to_string(idx) +
1588 ", ge outputs size: " + std::to_string(ge_outputs.size()) + ", kernel graph: " + kernel_graph->ToString());
1589 auto output_device_addr = iter->second.device_addrs[idx];
1590 auto node_output_device_addr = output_device_addr->GetMutablePtr();
1591 MS_LOG(INFO) << "Output addr " << node_output_device_addr;
1592 if (node_output_device_addr == nullptr) {
1593 MS_LOG(EXCEPTION) << "Output " << output_node->fullname_with_scope() << ", index: " << index
1594 << " address is nullptr, kernel graph: " << kernel_graph->ToString()
1595 << ", addr memory size: " << output_device_addr->GetSize()
1596 << "\n Maybe memory is not enough, memory statistics:"
1597 << AscendMemAdapter::GetInstance().DevMemStatistics();
1598 }
1599 MS_LOG(INFO) << "[ZeroCopy] For Graph " << kernel_graph->ToString() << ", update output "
1600 << output_node->DebugString() << " out_idx " << index << " address to "
1601 << output_device_addr->GetMutablePtr()
1602 << ", shape:" << output_device_addr->kernel_tensor()->GetShapeVector()
1603 << ", type: " << TypeIdToString(output_device_addr->type_id())
1604 << ", format: " << output_device_addr->format() << ", memory size: " << output_device_addr->GetSize();
1605 if (node_output_device_addr != ge_outputs[idx].GetData() ||
1606 output_device_addr->GetSize() != ge_outputs[idx].GetSize()) {
1607 (void)ge_outputs[idx].SetData(reinterpret_cast<uint8_t *>(node_output_device_addr), output_device_addr->GetSize(),
1608 [](void *) {});
1609 }
1610 }
1611 return ge_outputs;
1612 }
1613
RunInitGraph(const std::string & graph_name)1614 void GeGraphExecutor::RunInitGraph(const std::string &graph_name) {
1615 transform::RunOptions run_options;
1616 run_options.name = "init_subgraph." + graph_name;
1617 if (transform::GetGraphByName(run_options.name) == nullptr) {
1618 MS_LOG(INFO) << "Can not find " << run_options.name << " sub graph, don't need data init subgraph in INFER mode.";
1619 return;
1620 }
1621 auto graph_runner = transform::GetGraphRunner();
1622 if (graph_runner == nullptr) {
1623 MS_LOG(EXCEPTION) << "Can not found GraphRunner.";
1624 }
1625
1626 auto cur_sink_size = CurGraphSinkSize(graph_name);
1627 if (pre_sink_size_ == cur_sink_size) {
1628 return;
1629 }
1630 pre_sink_size_ = cur_sink_size;
1631 MS_LOG(INFO) << "Start run init graph: " << run_options.name << ", sink size:" << cur_sink_size;
1632 std::vector<transform::GeTensorPtr> ge_outputs;
1633 std::vector<transform::GeTensorPtr> ge_tensors;
1634 {
1635 // Release GIL before calling into (potentially long-running) C++ code
1636 GilReleaseWithCheck gil_release;
1637 transform::Status ret = transform::RunGraph(graph_runner, run_options, ge_tensors, &ge_outputs);
1638 if (ret != transform::Status::SUCCESS) {
1639 MS_LOG(EXCEPTION) << "Exec " << run_options.name << " graph failed.";
1640 }
1641 MS_LOG(INFO) << "Exec " << run_options.name << " graph success.";
1642 }
1643 }
1644 } // namespace ascend
1645 } // namespace device
1646 } // namespace mindspore
1647