1 /**
2 * Copyright 2021-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "runtime/graph_scheduler/actor/output_actor.h"
18 #include "runtime/hardware/device_context_manager.h"
19 #include "utils/log_adapter.h"
20 #include "include/backend/distributed/recovery/recovery_context.h"
21 #include "include/backend/distributed/collective/collective_manager.h"
22 #include "include/backend/mem_reuse/mem_tracker.h"
23
24 namespace mindspore {
25 namespace runtime {
26 using distributed::collective::CollectiveManager;
27 using distributed::recovery::RecoveryContext;
28
UpdateOutputTensorShape(const std::vector<TensorPtr> & output_tensors,const std::vector<KernelWithIndex> & output_nodes)29 void UpdateOutputTensorShape(const std::vector<TensorPtr> &output_tensors,
30 const std::vector<KernelWithIndex> &output_nodes) {
31 for (size_t i = 0; i < output_tensors.size(); ++i) {
32 MS_EXCEPTION_IF_NULL(output_tensors[i]);
33 if (output_tensors[i]->isa<tensor::MapTensor>()) {
34 continue;
35 }
36 auto shape = common::AnfAlgo::GetOutputInferShape(output_nodes[i].first, output_nodes[i].second);
37 (void)output_tensors[i]->set_shape(shape);
38 }
39 }
40
UpdateDynamicSequenceType(const AnfNodePtr & output_node,const kernel::KernelTensorPtr & output_kernel_tensor)41 void UpdateDynamicSequenceType(const AnfNodePtr &output_node, const kernel::KernelTensorPtr &output_kernel_tensor) {
42 MS_EXCEPTION_IF_NULL(output_node);
43 MS_EXCEPTION_IF_NULL(output_kernel_tensor);
44
45 if (!common::AnfAlgo::IsDynamicSequence(output_node)) {
46 return;
47 }
48
49 if (output_node->abstract() == nullptr || (!output_node->abstract()->isa<abstract::AbstractSequence>())) {
50 MS_LOG(WARNING) << "Skip update type for output node:" << output_node->DebugString();
51 return;
52 }
53
54 if (output_kernel_tensor->GetShape() == nullptr ||
55 (!output_kernel_tensor->GetShape()->isa<abstract::SequenceShape>())) {
56 MS_LOG(WARNING) << "Skip update type for output node:" << output_node->DebugString() << " as invalid shape:"
57 << (output_kernel_tensor->GetShape() == nullptr ? "nullptr"
58 : output_kernel_tensor->GetShape()->ToString());
59 return;
60 }
61
62 abstract::AbstractBasePtr element_abstract = nullptr;
63 const auto &sequence_abstract = output_node->abstract()->cast<abstract::AbstractSequencePtr>();
64 MS_EXCEPTION_IF_NULL(sequence_abstract);
65 if (sequence_abstract->dynamic_len()) {
66 if (sequence_abstract->dynamic_len_element_abs() != nullptr) {
67 element_abstract = sequence_abstract->dynamic_len_element_abs();
68 }
69 } else if (sequence_abstract->size() != 0) {
70 element_abstract = sequence_abstract->elements()[0];
71 }
72
73 TypePtr element_type = TypeIdToType(output_kernel_tensor->dtype_id());
74 if (element_abstract != nullptr && element_abstract->isa<abstract::AbstractTensor>()) {
75 element_type = std::make_shared<TensorType>(element_type);
76 }
77
78 const auto &sequence_shape = output_kernel_tensor->GetShape()->cast<abstract::SequenceShapePtr>();
79 MS_EXCEPTION_IF_NULL(sequence_shape);
80 TypePtrList types(sequence_shape->size(), element_type);
81 if (sequence_abstract->isa<abstract::AbstractTuple>()) {
82 output_kernel_tensor->SetType(std::make_shared<Tuple>(types));
83 return;
84 }
85 output_kernel_tensor->SetType(std::make_shared<List>(types));
86 }
87
Init()88 void OutputActor::Init() {
89 // Check device contexts number.
90 if (device_contexts_.size() != output_nodes_.size()) {
91 MS_LOG(EXCEPTION) << "The device contexts number is wrong.";
92 }
93 // Check outputs number.
94 if (output_nodes_.size() != outputs_.size()) {
95 MS_LOG(EXCEPTION) << "The outputs number is wrong.";
96 }
97 // Check output device tensors number.
98 if (outputs_.size() != output_device_tensors_.size()) {
99 MS_LOG(EXCEPTION) << "The output device tensors number is wrong.";
100 }
101
102 // Set the number of actor running dependent messages.
103 running_dependent_msg_num_ = SizeToInt(outputs_num_ - device_tensor_store_keys_.size());
104 }
105
FreeOutputNodeMem()106 void OutputActor::FreeOutputNodeMem() {
107 for (size_t i = 0; i < output_nodes_.size(); ++i) {
108 auto &output_node = output_nodes_[i].first;
109 auto &output_device_tensor = output_device_tensors_[i];
110 // The output_device_tensor may be repeated.
111 if ((output_node == nullptr) || (output_device_tensor == nullptr) || (output_device_tensor->GetPtr() == nullptr)) {
112 continue;
113 }
114 if (!IsOutputAddressPersisted(output_device_tensor, output_nodes_[i])) {
115 FreeMemoryByDeviceContext(output_device_tensor, device_contexts_[i]);
116 }
117 }
118 }
119
FreeSummaryNodeMem()120 void OutputActor::FreeSummaryNodeMem() {
121 for (size_t i = 0; i < summary_nodes_.size(); ++i) {
122 auto &summary_node = summary_nodes_[i].first;
123 auto index = summary_nodes_[i].second;
124 if (summary_node == nullptr) {
125 continue;
126 }
127 auto output_device_addr = AnfAlgo::GetMutableOutputAddr(summary_node, index, false);
128 if ((output_device_addr == nullptr) || (output_device_addr->GetPtr() == nullptr)) {
129 continue;
130 }
131 if (!IsOutputAddressPersisted(output_device_addr.get(), summary_nodes_[i])) {
132 FreeMemoryByDeviceContext(output_device_addr.get(), nullptr);
133 }
134 }
135 }
136
ClearOutputCache()137 void OutputActor::ClearOutputCache() {
138 output_node_to_tensor_device_address_.clear();
139 outputs_.clear();
140 outputs_.resize(outputs_num_);
141 output_nodes_.clear();
142 output_nodes_.resize(outputs_num_);
143 output_device_tensors_.clear();
144 output_device_tensors_.resize(outputs_num_);
145
146 current_outputs_num_ = 0;
147 current_count_ = 0;
148 }
149
RunOpControl(AID * const,OpContext<DeviceTensor> * const context)150 void OutputActor::RunOpControl(AID *const, OpContext<DeviceTensor> *const context) {
151 ProfilerRecorder profiler(ProfilerModule::kRuntime, ProfilerEvent::kOutputProcess, GetAID().Name());
152 MS_EXCEPTION_IF_NULL(context);
153 ++current_count_;
154 MS_LOG(DEBUG) << "Actor(" << GetAID().Name() << ") receive the input op control and current count:" << current_count_;
155
156 // Trigger disaster recovery and return empty output.
157 if (RecoveryContext::GetInstance()->enable_recovery() && CollectiveManager::instance()->need_reinit()) {
158 FreeOutputNodeMem();
159 ClearOutputCache();
160 SET_OPCONTEXT_SUCCESS_RET((*context));
161 }
162
163 // The last loop.
164 if (loop_count_ == current_count_) {
165 if (current_outputs_num_ + device_tensor_store_keys_.size() != outputs_num_) {
166 std::string error_info = "The outputs num is wrong, the total outputs num: " + std::to_string(outputs_num_) +
167 ", the current outputs num: " + std::to_string(current_outputs_num_) +
168 ", the device tensor store num: " + std::to_string(device_tensor_store_keys_.size());
169 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
170 }
171
172 // Because device tensor store can't send data, so fetch the output result of device tensor store in running end.
173 for (const auto &device_tensor_store_key : device_tensor_store_keys_) {
174 if (device_tensor_store_key.first >= outputs_.size()) {
175 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The input index is of range.");
176 }
177 if (device_tensor_store_key.second != nullptr && device_tensor_store_key.second->isa<ValueNode>()) {
178 const auto &value = device_tensor_store_key.second->cast<ValueNodePtr>()->value();
179 MS_EXCEPTION_IF_NULL(value);
180 if (value->isa<tensor::Tensor>()) {
181 outputs_[device_tensor_store_key.first] = value->cast<tensor::TensorPtr>();
182 continue;
183 } else if (value->isa<Scalar>()) {
184 outputs_[device_tensor_store_key.first] = ScalarToTensor(value->cast<ScalarPtr>());
185 continue;
186 } else {
187 MS_LOG(DEBUG) << "Output value node:" << device_tensor_store_key.second->DebugString();
188 }
189 }
190 outputs_[device_tensor_store_key.first] =
191 CreateOutputTensor(device_tensor_store_key.second, 0, device_tensor_store_key.first, context);
192 if (outputs_[device_tensor_store_key.first] == nullptr) {
193 SET_OPCONTEXT_FAIL_RET_WITH_ERROR(*context, "Create output tensor failed.");
194 }
195 output_nodes_[device_tensor_store_key.first] = {device_tensor_store_key.second, 0};
196 const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(device_tensor_store_key.second, 0, false);
197 output_device_tensors_[device_tensor_store_key.first] = device_tensor.get();
198 }
199
200 current_outputs_num_ = 0;
201 current_count_ = 0;
202 SET_OPCONTEXT_SUCCESS_RET((*context));
203 }
204
205 // Maybe the output node is the dynamic shape, need free the output node address to alloc new address by the new shape
206 // and size in the next step running.
207 FreeOutputNodeMem();
208
209 // Free summary node input after usage.
210 FreeSummaryNodeMem();
211
212 // Send control arrow to trigger next step running.
213 auto from_aid = const_cast<AID *>(&GetAID());
214 for (auto &output_control : output_control_arrows_) {
215 MS_EXCEPTION_IF_NULL(output_control);
216 ActorDispatcher::Send(output_control->to_op_id_, &OpActor::RunOpControl, from_aid, context);
217 }
218 }
219
RunOpData(OpData<DeviceTensor> * const input_data,OpContext<DeviceTensor> * const context)220 void OutputActor::RunOpData(OpData<DeviceTensor> *const input_data, OpContext<DeviceTensor> *const context) {
221 ProfilerRecorder profiler(ProfilerModule::kRuntime, ProfilerEvent::kOutputProcess, GetAID().Name());
222 MS_EXCEPTION_IF_NULL(input_data);
223 MS_EXCEPTION_IF_NULL(input_data->data_);
224 MS_EXCEPTION_IF_NULL(context);
225 MS_LOG(DEBUG) << "Actor(" << GetAID().Name()
226 << ") receive the input op data and output position:" << input_data->index_
227 << " device tensor:" << input_data->data_ << " ptr:" << input_data->data_->GetPtr()
228 << " ref count:" << input_data->data_->ref_count()
229 << " origin ref count:" << input_data->data_->original_ref_count()
230 << " dynamic ref count:" << input_data->data_->dynamic_ref_count()
231 << " from memory pool:" << input_data->data_->from_mem_pool() << " output node:"
232 << (input_data->data_->GetNodeIndex().first == nullptr
233 ? "null"
234 : input_data->data_->GetNodeIndex().first->DebugString())
235 << " index:" << input_data->data_->GetNodeIndex().second;
236 auto output_position = IntToSize(input_data->index_);
237 if (output_position >= outputs_.size()) {
238 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The input index is of range.");
239 }
240 // Save the output nodes and output device tensors.
241 auto node_with_index = input_data->data_->GetNodeIndex();
242 MS_EXCEPTION_IF_NULL(node_with_index.first);
243 output_nodes_[output_position] = node_with_index;
244 output_device_tensors_[output_position] = input_data->data_;
245
246 // Collect the output result in the last loop which is represented by "loop_count_ - current_count_ == 1".
247 if (loop_count_ - current_count_ != 1) {
248 return;
249 }
250
251 auto tensor = CreateOutputTensor(node_with_index.first, node_with_index.second, output_position, context);
252 if (tensor == nullptr) {
253 SET_OPCONTEXT_FAIL_RET_WITH_ERROR(*context, "Create output tensor failed.");
254 }
255 tensor->set_need_release_device_mem(true);
256 outputs_[output_position] = tensor;
257 current_outputs_num_++;
258 }
259
CreateOutputTensor(const AnfNodePtr & output_node,size_t output_index,size_t output_position,OpContext<DeviceTensor> * const context)260 TensorPtr OutputActor::CreateOutputTensor(const AnfNodePtr &output_node, size_t output_index, size_t output_position,
261 OpContext<DeviceTensor> *const context) {
262 MS_EXCEPTION_IF_NULL(output_node);
263 bool is_dynamic_shape_output =
264 common::AnfAlgo::IsDynamicShape(output_node) || common::AnfAlgo::IsDynamicSequence(output_node);
265 // Wait pipeline for dynamic shape output node if need.
266 // Note: In dynamic shape case, when actor thread number <= 3, maybe only enable async launch kernel, we should check
267 // weather enable async launch kernel rather than whether enable multi pipeline here.
268 if (ActorDispatcher::enable_async_launch_kernel() && is_dynamic_shape_output) {
269 // Need wait all kernel launch task finish to update output shape and size for computed depend kernel.
270 bool is_computed_depend_kernel = false;
271 if (!output_node->isa<CNode>()) {
272 is_computed_depend_kernel = false;
273 } else {
274 auto kernel_mod = AnfAlgo::GetKernelMod(output_node);
275 if (kernel_mod && kernel_mod->IsNeedUpdateOutputShapeAndSize()) {
276 is_computed_depend_kernel = true;
277 }
278 }
279
280 if (!WaitRuntimePipelineFinish(context, is_computed_depend_kernel)) {
281 MS_LOG(INFO) << "Run graph failed and please check error log.";
282 return nullptr;
283 }
284 }
285
286 const auto &output_kernel_tensor = AnfAlgo::GetOutputKernelTensor(output_node, output_index);
287 MS_EXCEPTION_IF_NULL(output_kernel_tensor);
288 MS_LOG(DEBUG) << "Create output tensor, output node: " << output_node->fullname_with_scope()
289 << " debug string:" << output_node->DebugString() << ", output index: " << output_index
290 << ", output position: " << output_position
291 << ", output kernel tensor: " << output_kernel_tensor->ToString();
292
293 // For dynamice sequence output, the Type(Tuple) hasn't been re-inferred, only Shape has been re-inferred, need update
294 // real Type of Tuple into kernel tensor to restore the tuple output.
295 UpdateDynamicSequenceType(output_node, output_kernel_tensor);
296
297 // If output is an empty sequence return an empty tensor directly.
298 const auto &output_shape = output_kernel_tensor->GetShape();
299 if (output_shape != nullptr && output_shape->isa<abstract::SequenceShape>() &&
300 output_shape->cast<abstract::SequenceShapePtr>()->size() == 0) {
301 ShapeVector shape = {0};
302 TypeId type_id = (output_kernel_tensor->dtype_id() == TypeId::kTypeUnknown ? TypeId::kNumberTypeInt64
303 : output_kernel_tensor->dtype_id());
304 const auto &tensor = std::make_shared<tensor::Tensor>(type_id, shape);
305 tensor->set_base_shape(output_shape);
306 return tensor;
307 }
308
309 const auto &abstract = AnfAlgo::GetNodeAbstractByIndex(output_node, output_index);
310 if (abstract != nullptr && abstract->isa<abstract::AbstractMapTensor>()) {
311 return AnfAlgo::CreateMapTensor(output_node, output_index);
312 }
313 // Create host tensor, the output tensor should use the infer type, it will be handed correctly by tensor data sync
314 // when infer type is not equal to device type.
315 auto type_id = common::AnfAlgo::GetOutputInferDataType(output_node, output_index);
316 const auto &shape = output_kernel_tensor->GetShapeVector();
317 auto tensor = std::make_shared<tensor::Tensor>(type_id, shape);
318 MS_EXCEPTION_IF_NULL(tensor);
319 // Set tensor base shape for restoring the tuple output when output node is dynamic sequence.
320 if (common::AnfAlgo::IsDynamicSequence(output_node)) {
321 tensor->set_base_shape(output_kernel_tensor->GetShape());
322 }
323
324 if (output_position >= device_contexts_.size()) {
325 MS_LOG(ERROR) << "The output position is of range: " << output_position;
326 return nullptr;
327 }
328 auto &device_context = device_contexts_[output_position];
329 MS_EXCEPTION_IF_NULL(device_context);
330 const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(output_node, output_index, false);
331 MS_EXCEPTION_IF_NULL(device_tensor);
332 device_tensor->set_padding_type(AnfAlgo::GetOutputReshapeType(output_node, output_index));
333 if (device_context->GetDeviceType() != device_tensor->GetDeviceType()) {
334 auto old_device_context = device_context;
335 device_context = device::DeviceContextManager::GetInstance().GetOrCreateDeviceContext(
336 {device_tensor->device_name(), device_tensor->device_id()});
337 MS_LOG(INFO) << "Update device context from:" << old_device_context->GetDeviceType()
338 << " to:" << device_context->GetDeviceType();
339 }
340
341 // Create the device address and put it into host tensor.
342 if (output_node_to_tensor_device_address_.count({output_node, output_index}) > 0) {
343 tensor->set_device_address(output_node_to_tensor_device_address_[{output_node, output_index}]);
344 } else {
345 auto kernel_tensor = std::make_shared<kernel::KernelTensor>(
346 nullptr, device_tensor->GetSize(), kernel::GetFormatFromStrToEnum(device_tensor->format()),
347 device_tensor->type_id(), device_tensor->host_shape(), device_context->device_context_key().device_name_,
348 device_context->device_context_key().device_id_);
349 kernel_tensor->SetType(output_kernel_tensor->GetType());
350 kernel_tensor->SetShape(output_kernel_tensor->GetShape());
351 kernel_tensor->set_stream_id(device_tensor->stream_id());
352 // SetShape will calculate a default size by host shape, need to set real device size for special format.
353 kernel_tensor->set_size(device_tensor->GetSize());
354 auto tensor_device_address = device_context->device_res_manager_->CreateDeviceAddress(kernel_tensor);
355 MS_EXCEPTION_IF_NULL(tensor_device_address);
356 MS_LOG(DEBUG) << "Create device tensor:" << tensor_device_address << ", size: " << kernel_tensor->size()
357 << " type:" << tensor_device_address->type_id()
358 << " output node:" << output_node->fullname_with_scope() << " output index:" << output_index
359 << " output position:" << output_position << ", origin output device tensor: " << device_tensor;
360 tensor->set_device_address(tensor_device_address);
361 output_node_to_tensor_device_address_[{output_node, output_index}] = tensor_device_address;
362 }
363 return tensor;
364 }
365
UpdateOutputDeviceAddress()366 void OutputActor::UpdateOutputDeviceAddress() {
367 ProfilerRecorder profiler(ProfilerModule::kRuntime, ProfilerEvent::kOutputProcess, "UpdateOutputDeviceAddress");
368 // In the running end, when the device ptr of graph output node is set into host tensor, the graph output node
369 // need be set new device ptr, to avoid that the device ptr context of host tensor be rewritten in the next
370 // step or next loop. But the graph output nodes corresponding to device tensor store need to be skipped, because
371 // they are fixed addresses and persistent.
372 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, GetAID().Name(), "UpdateOutputDeviceAddress", "");
373 for (size_t i = 0; i < output_nodes_.size(); ++i) {
374 auto &output_node = output_nodes_[i].first;
375 if (i >= output_device_tensors_.size()) {
376 MS_LOG(EXCEPTION) << "Invalid index:" << i << " current:" << output_device_tensors_.size();
377 }
378 auto device_tensor = output_device_tensors_[i];
379 if (output_node == nullptr || device_tensor == nullptr) {
380 MS_LOG(INFO) << "The output node or device tensor is nullptr, need check whether affect the result.";
381 continue;
382 }
383
384 auto &tensor = outputs_[i];
385 MS_EXCEPTION_IF_NULL(tensor);
386 if (tensor->base_shape_ptr() != nullptr && tensor->base_shape_ptr()->isa<abstract::SequenceShape>() &&
387 tensor->base_shape_ptr()->cast<abstract::SequenceShapePtr>()->size() == 0) {
388 continue;
389 }
390 auto tensor_device_address = std::dynamic_pointer_cast<DeviceTensor>(tensor->device_address());
391 MS_EXCEPTION_IF_NULL(tensor_device_address);
392 // Update tensor device address by device tensor of output node.
393 tensor_device_address->set_original_ref_count(SIZE_MAX);
394 tensor_device_address->ResetRefCount();
395 tensor_device_address->set_dynamic_ref_count(INT32_MAX);
396 auto node_with_index = device_tensor->GetNodeIndex();
397 tensor_device_address->SetNodeIndex(node_with_index.first, node_with_index.second);
398 tensor_device_address->set_from_persistent_mem(device_tensor->from_persistent_mem());
399 tensor_device_address->set_host_shape(tensor->shape());
400 // The outputs may have the same output node, so need skip when the node has been done.
401 if (tensor_device_address->GetPtr() != nullptr) {
402 continue;
403 }
404
405 auto device_context = device_contexts_[i];
406 MS_EXCEPTION_IF_NULL(device_context);
407 MS_EXCEPTION_IF_NULL(device_context->device_res_manager_);
408 const auto &swap_manager = device_context->device_res_manager_->swap_manager();
409 if (swap_manager != nullptr) {
410 swap_manager->AddSwappableTensor(tensor_device_address);
411 }
412 // If the output node whose output address ptr can't be changed, then alloc the new device memory and copy the data:
413 if (IsOutputAddressPersisted(device_tensor, output_nodes_[i])) {
414 device::DynamicMemAllocatorDebugInfo::SetDebugInfo(GetAID().Name(), device::AllocatorType::kOther);
415 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddMemInfo, GetAID().Name(), device::tracker::MemType::kOther,
416 tensor_device_address->GetSize(), tensor_device_address.get());
417 if (!device_context->device_res_manager_->AllocateMemory(tensor_device_address.get(), kDefaultStreamIndex)) {
418 MS_LOG_WITH_NODE(EXCEPTION, output_node)
419 << "Device(id:" << device_context->device_context_key().device_id_
420 << ") memory isn't enough and alloc failed in output actor, kernel name: "
421 << output_node->fullname_with_scope() << ", alloc size: " << tensor_device_address->GetSize() << "B.";
422 }
423 MS_LOG(DEBUG) << "Sync device data from device tensor: " << device_tensor
424 << ", to device tensor: " << tensor_device_address << ", size: " << device_tensor->GetSize();
425 if (!tensor_device_address->SyncDeviceToDevice(device_tensor)) {
426 MS_LOG_WITH_NODE(EXCEPTION, output_node)
427 << "Sync device to device failed, device type: " << tensor_device_address->GetDeviceType()
428 << ", output node: " << output_node->fullname_with_scope();
429 }
430 } else {
431 MS_LOG(DEBUG) << "Swap ptr:" << device_tensor->GetPtr() << " from device tensor:" << device_tensor
432 << " device type:" << device_tensor->GetDeviceType() << " to :" << tensor_device_address
433 << " device type:" << tensor_device_address->GetDeviceType();
434 // Move the device ptr from device_tensor to tensor_device_address.
435 device_tensor->Swap(tensor_device_address.get());
436 tensor_device_address->set_user_data(device_tensor->user_data());
437 }
438 }
439
440 output_node_to_tensor_device_address_.clear();
441 output_nodes_.clear();
442 output_nodes_.resize(outputs_num_);
443 output_device_tensors_.clear();
444 output_device_tensors_.resize(outputs_num_);
445 }
446 } // namespace runtime
447 } // namespace mindspore
448