• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "runtime/framework/actor/kernel_actor.h"
18 #include "runtime/framework/actor/memory_manager_actor.h"
19 #include "runtime/framework/actor/output_actor.h"
20 #include "runtime/framework/actor/recorder_actor.h"
21 #include "runtime/framework/actor/debug_actor.h"
22 #include "mindrt/include/async/async.h"
23 #include "utils/log_adapter.h"
24 
25 namespace mindspore {
26 namespace runtime {
Init()27 void KernelActor::Init() {
28   // Check device contexts number.
29   if (device_contexts_.size() != device::kDeviceContextsNumOne) {
30     MS_LOG(EXCEPTION) << "The device contexts number is wrong.";
31   }
32 
33   // Set the number of actor running dependent messages.
34   running_dependent_msg_num_ = SizeToInt(input_datas_num_ + input_controls_num_);
35 
36   MS_EXCEPTION_IF_NULL(kernel_);
37   real_input_num_ = AnfAlgo::GetInputTensorNum(kernel_);
38   kernel_info_ = dynamic_cast<KernelInfo *>(kernel_->kernel_info());
39   is_dynamic_shape_ = AnfAlgo::IsDynamicShape(kernel_);
40 
41   // Init the device tensors and kernel launch info.
42   copy_input_device_tensors_.resize(real_input_num_);
43   input_device_tensors_.resize(real_input_num_);
44   for (auto &input_address : input_device_tensors_) {
45     (void)memory_free_list_.emplace_back(input_address);
46     (void)launch_info_.inputs_.emplace_back(std::make_shared<Address>());
47   }
48   MS_EXCEPTION_IF_NULL(kernel_info_);
49   for (auto &output_address : kernel_info_->output_address_list()) {
50     MS_EXCEPTION_IF_NULL(output_address);
51     (void)output_device_tensors_.emplace_back(output_address.get());
52     (void)memory_alloc_list_.emplace_back(output_address.get());
53     (void)memory_free_list_.emplace_back(output_address.get());
54     (void)launch_info_.outputs_.emplace_back(std::make_shared<Address>());
55   }
56   for (auto &workspace_address : kernel_info_->workspace_address_list()) {
57     MS_EXCEPTION_IF_NULL(workspace_address);
58     (void)workspace_device_tensors_.emplace_back(workspace_address.get());
59     (void)memory_alloc_list_.emplace_back(workspace_address.get());
60     (void)memory_free_list_.emplace_back(workspace_address.get());
61     (void)launch_info_.workspaces_.emplace_back(std::make_shared<Address>());
62   }
63   for (auto &external_reference_tensor : external_reference_tensors_) {
64     (void)memory_free_list_.emplace_back(external_reference_tensor);
65   }
66 
67   // Init the output data.
68   output_data_by_output_index_.resize(output_device_tensors_.size());
69   for (auto &data_arrow : output_data_arrows_) {
70     MS_EXCEPTION_IF_NULL(data_arrow);
71     if (IntToSize(data_arrow->from_output_index_) >= output_device_tensors_.size()) {
72       MS_LOG(EXCEPTION) << "The output index is out of range: " << GetAID().Name();
73     }
74     auto device_address = output_device_tensors_[data_arrow->from_output_index_];
75     auto data =
76       std::make_unique<OpData<DeviceTensor>>(data_arrow->to_op_id_, device_address, data_arrow->to_input_index_);
77     (void)output_data_.emplace_back(data.get());
78     (void)output_data_by_output_index_[data_arrow->from_output_index_].emplace_back(std::move(data));
79   }
80 }
81 
RunOpData(OpData<DeviceTensor> * const input_data,OpContext<DeviceTensor> * const context)82 void KernelActor::RunOpData(OpData<DeviceTensor> *const input_data, OpContext<DeviceTensor> *const context) {
83   MS_EXCEPTION_IF_NULL(context);
84   MS_EXCEPTION_IF_NULL(device_contexts_[0]);
85 
86   auto &sequential_num = context->sequential_num_;
87   (void)input_op_datas_[sequential_num].emplace_back(input_data);
88   if (input_data->data_ == nullptr) {
89     std::string error_info =
90       "Input data of actor:" + GetAID().Name() + " num:" + std::to_string(input_data->index_) + " is empty";
91     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
92   }
93   // When all the inputs are collected, then allocate memory and callback launch.
94   if (CheckRunningCondition(context)) {
95     // Infer kernel shape and update abstract info for dynamic shape kernel.
96     if (is_dynamic_shape_) {
97       device_contexts_[0]->UpdateDynamicShape(kernel_);
98     }
99 
100     FetchInputDeviceTensor(context);
101     FetchOutputDeviceTensor();
102     if (memory_alloc_list_.size() > 0) {
103       SendMemoryAllocReq(context);
104     } else {
105       OnMemoryAllocFinish(context);
106     }
107   }
108 }
109 
RunOpControl(AID * const input_control,OpContext<DeviceTensor> * const context)110 void KernelActor::RunOpControl(AID *const input_control, OpContext<DeviceTensor> *const context) {
111   MS_EXCEPTION_IF_NULL(context);
112   MS_EXCEPTION_IF_NULL(device_contexts_[0]);
113 
114   auto &sequential_num = context->sequential_num_;
115   (void)input_op_controls_[sequential_num].emplace_back(input_control);
116   // When all the inputs are collected, then allocate memory and callback launch.
117   if (CheckRunningCondition(context)) {
118     // Infer kernel shape and update abstract info for dynamic shape kernel.
119     if (is_dynamic_shape_) {
120       device_contexts_[0]->UpdateDynamicShape(kernel_);
121     }
122 
123     FetchInputDeviceTensor(context);
124     FetchOutputDeviceTensor();
125     if (memory_alloc_list_.size() > 0) {
126       SendMemoryAllocReq(context);
127     } else {
128       OnMemoryAllocFinish(context);
129     }
130   }
131 }
132 
RunOpControlWithInputTensor(AID * const input_control,OpContext<DeviceTensor> * const context,const std::vector<TensorPtr> * input_tensors)133 void KernelActor::RunOpControlWithInputTensor(AID *const input_control, OpContext<DeviceTensor> *const context,
134                                               const std::vector<TensorPtr> *input_tensors) {
135   MS_EXCEPTION_IF_NULL(context);
136   MS_EXCEPTION_IF_NULL(input_tensors);
137   auto &sequential_num = context->sequential_num_;
138   (void)input_op_controls_[sequential_num].emplace_back(input_control);
139 
140   PushInputDeviceTensor(input_tensors);
141   // When all the inputs are collected, then allocate memory and callback launch.
142   if (CheckRunningCondition(context)) {
143     FetchOutputDeviceTensor();
144     if (memory_alloc_list_.size() > 0) {
145       SendMemoryAllocReq(context);
146     }
147     OnMemoryAllocFinish(context);
148   }
149 }
150 
151 namespace {
AllocateMemory(const std::vector<DeviceTensor * > & alloc_list,const DeviceContext * device_context,OpContext<DeviceTensor> * const context,const std::string & actor_name)152 void AllocateMemory(const std::vector<DeviceTensor *> &alloc_list, const DeviceContext *device_context,
153                     OpContext<DeviceTensor> *const context, const std::string &actor_name) {
154   MS_EXCEPTION_IF_NULL(device_context);
155   MS_EXCEPTION_IF_NULL(context);
156 
157   for (auto &device_tensor : alloc_list) {
158     MS_EXCEPTION_IF_NULL(device_tensor);
159     if ((device_tensor->GetPtr() != nullptr) || (device_tensor->GetSize() == 0)) {
160       continue;
161     }
162     // Allocate memory through the device context.
163     if (!device_context->AllocateMemory(device_tensor, device_tensor->GetSize())) {
164       SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kStep, *context, *device_context, actor_name,
165                                                   device_tensor->GetSize());
166     }
167   }
168 }
169 
FreeMemory(const std::vector<DeviceTensor * > & free_list,const DeviceContext * device_context)170 void FreeMemory(const std::vector<DeviceTensor *> &free_list, const DeviceContext *device_context) {
171   MS_EXCEPTION_IF_NULL(device_context);
172   for (auto &device_tensor : free_list) {
173     MS_EXCEPTION_IF_NULL(device_tensor);
174     if (device_tensor->original_ref_count() == SIZE_MAX) {
175       continue;
176     }
177     // The reference count is decremented to zero to free memory, and reset to the original count.
178     device_tensor->DecreaseRefCount();
179     if (device_tensor->ref_count() == 0) {
180       // Free memory through the device context.
181       if (device_tensor->GetPtr() != nullptr) {
182         device_context->FreeMemory(device_tensor);
183       }
184       device_tensor->ResetRefCount();
185     }
186   }
187 }
188 }  // namespace
189 
SendMemoryAllocReq(OpContext<DeviceTensor> * const context)190 void KernelActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
191   running_dependent_msg_num_ = 1;
192   if (strategy_ == GraphExecutionStrategy::kPipeline) {
193     Async(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &memory_alloc_list_, device_contexts_[0], context,
194           GetAID());
195   } else {
196     AllocateMemory(memory_alloc_list_, device_contexts_[0], context, GetAID().Name());
197   }
198 }
199 
SendMemoryFreeReq(OpContext<DeviceTensor> * const context)200 void KernelActor::SendMemoryFreeReq(OpContext<DeviceTensor> *const context) {
201   if (strategy_ == GraphExecutionStrategy::kPipeline) {
202     Async(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &memory_free_list_, device_contexts_[0], context);
203   } else {
204     FreeMemory(memory_free_list_, device_contexts_[0]);
205   }
206 }
207 
OnMemoryAllocFinish(OpContext<DeviceTensor> * const context)208 void KernelActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
209   MS_EXCEPTION_IF_NULL(context);
210   MS_EXCEPTION_IF_NULL(kernel_);
211   MS_EXCEPTION_IF_NULL(device_contexts_[0]);
212   PreLaunchKernel(context);
213 
214   try {
215     auto ret = device_contexts_[0]->LaunchKernel(kernel_, launch_info_.inputs_, launch_info_.workspaces_,
216                                                  launch_info_.outputs_, is_dynamic_shape_);
217     if (!ret) {
218       std::string error_info = "Launch kernel failed: " + kernel_->fullname_with_scope();
219       SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
220     }
221   } catch (const std::exception &e) {
222     MsException::Instance().SetException();
223     std::string error_info = "Launch kernel exception: " + kernel_->fullname_with_scope();
224     SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
225   }
226 
227   // Debug actor is blocked, must wait debug actor callback message to process continue.
228   if (debug_aid_ != nullptr && strategy_ == GraphExecutionStrategy::kPipeline) {
229     SendDebugReq(context);
230     return;
231   }
232 
233   PostLaunchKernel(context);
234 }
235 
SendDebugReq(OpContext<DeviceTensor> * const context)236 void KernelActor::SendDebugReq(OpContext<DeviceTensor> *const context) {
237   running_dependent_msg_num_ = 1;
238   Async(*debug_aid_, &DebugActor::Debug, kernel_, &launch_info_, device_contexts_[0], context, &GetAID());
239 }
240 
OnDebugFinish(OpContext<DeviceTensor> * context)241 void KernelActor::OnDebugFinish(OpContext<DeviceTensor> *context) {
242   MS_EXCEPTION_IF_NULL(context);
243   PostLaunchKernel(context);
244 }
245 
PushInputDeviceTensor(const std::vector<TensorPtr> * input_tensors)246 void KernelActor::PushInputDeviceTensor(const std::vector<TensorPtr> *input_tensors) {
247   MS_EXCEPTION_IF_NULL(input_tensors);
248   if (input_tensors->size() != real_input_num_) {
249     MS_LOG(ERROR) << "Input tensor number: " << input_tensors->size()
250                   << " is not equal to kernel's input size: " << real_input_num_;
251     return;
252   }
253 
254   for (size_t input_index = 0; input_index < input_tensors->size(); input_index++) {
255     const auto &input_tensor = (*input_tensors)[input_index];
256     MS_EXCEPTION_IF_NULL(input_tensor);
257     const auto &device_tensor = std::dynamic_pointer_cast<DeviceTensor>(input_tensor->device_address());
258     if (device_tensor != nullptr) {
259       input_device_tensors_[input_index] = device_tensor.get();
260       memory_free_list_[input_index] = device_tensor.get();
261     }
262   }
263 }
264 
CopyInputDeviceTensor(const OpData<DeviceTensor> * input_data,OpContext<DeviceTensor> * const context)265 void KernelActor::CopyInputDeviceTensor(const OpData<DeviceTensor> *input_data,
266                                         OpContext<DeviceTensor> *const context) {
267   MS_EXCEPTION_IF_NULL(input_data);
268   MS_EXCEPTION_IF_NULL(context);
269   MS_EXCEPTION_IF_NULL(device_contexts_[0]);
270   if ((input_data->data_ == nullptr) ||
271       (input_data->data_->DeviceType() == device_contexts_[0]->GetDeviceAddressType())) {
272     return;
273   }
274 
275   MS_LOG(DEBUG) << "Copy from device type: " << input_data->data_->DeviceType()
276                 << " to device type: " << device_contexts_[0]->GetDeviceAddressType() << " in " << GetAID().Name();
277   if (copy_input_device_tensors_[input_data->index_] == nullptr) {
278     copy_input_device_tensors_[input_data->index_] = device_contexts_[0]->CreateDeviceAddress(
279       nullptr, input_data->data_->GetSize(), input_data->data_->format(), input_data->data_->type_id());
280   }
281   // Dynamic shape need update size.
282   copy_input_device_tensors_[input_data->index_]->SetSize(input_data->data_->GetSize());
283 
284   if (copy_input_device_tensors_[input_data->index_]->GetPtr() == nullptr) {
285     if (!device_contexts_[0]->AllocateMemory(copy_input_device_tensors_[input_data->index_].get(),
286                                              copy_input_device_tensors_[input_data->index_]->GetSize())) {
287       SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, *context, *(device_contexts_[0]),
288                                                   GetAID().Name(),
289                                                   copy_input_device_tensors_[input_data->index_]->GetSize());
290     }
291   }
292 
293   if (!Copy(copy_input_device_tensors_[input_data->index_].get(), input_data->data_)) {
294     std::string error_info = "Copy device tensor failed: " + GetAID().Name();
295     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
296   }
297 
298   // Update by the copy input device tensor.
299   input_device_tensors_[input_data->index_] = copy_input_device_tensors_[input_data->index_].get();
300   memory_free_list_[input_data->index_] = copy_input_device_tensors_[input_data->index_].get();
301 }
302 
FetchInputDeviceTensor(OpContext<DeviceTensor> * const context)303 void KernelActor::FetchInputDeviceTensor(OpContext<DeviceTensor> *const context) {
304   MS_EXCEPTION_IF_NULL(context);
305   MS_EXCEPTION_IF_NULL(device_contexts_[0]);
306 
307   const auto &data_iter = input_op_datas_.find(context->sequential_num_);
308   if (data_iter != input_op_datas_.end()) {
309     for (auto &input_data : data_iter->second) {
310       MS_EXCEPTION_IF_NULL(input_data);
311       if (IntToSize(input_data->index_) >= input_device_tensors_.size()) {
312         SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), "The input index is out of range.");
313       }
314 
315       if (input_device_tensors_[input_data->index_] != input_data->data_) {
316         input_device_tensors_[input_data->index_] = input_data->data_;
317         memory_free_list_[input_data->index_] = input_data->data_;
318       }
319       CopyInputDeviceTensor(input_data, context);
320     }
321   }
322 
323   for (auto &device_tensor_store_key : device_tensor_store_keys_) {
324     auto device_tensor = DeviceTensorStore::GetInstance().Fetch(device_tensor_store_key.second.get(),
325                                                                 device_contexts_[0]->GetDeviceAddressType());
326     if (device_tensor == nullptr) {
327       std::string error_info =
328         GetAID().Name() + " get device tensor store failed: " + device_tensor_store_key.second->fullname_with_scope() +
329         ", device type:" + std::to_string(static_cast<int>(device_contexts_[0]->GetDeviceAddressType()));
330       SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
331     }
332 
333     if (device_tensor_store_key.first >= input_device_tensors_.size()) {
334       SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), "The input index is out of range.");
335     }
336     if (input_device_tensors_[device_tensor_store_key.first] != device_tensor) {
337       input_device_tensors_[device_tensor_store_key.first] = device_tensor;
338       memory_free_list_[device_tensor_store_key.first] = device_tensor;
339     }
340   }
341 }
342 
FetchOutputDeviceTensor()343 void KernelActor::FetchOutputDeviceTensor() {
344   MS_EXCEPTION_IF_NULL(kernel_info_);
345   auto &output_addresses = kernel_info_->output_address_list();
346   const auto &kernel_mod = kernel_info_->kernel_mod();
347   MS_EXCEPTION_IF_NULL(kernel_mod);
348   const auto &output_size_list = kernel_mod->GetOutputSizeList();
349 
350   if (output_addresses.size() != output_size_list.size()) {
351     MS_LOG(EXCEPTION) << "The outputs number is not equal.";
352   }
353 
354   for (size_t i = 0; i < output_addresses.size(); ++i) {
355     auto output_address = output_addresses[i].get();
356     MS_EXCEPTION_IF_NULL(output_address);
357     if (output_size_list[i] != output_address->GetSize()) {
358       // The size of output address may be changed in dynamic shape scenario.
359       output_address->SetSize(output_size_list[i]);
360     }
361 
362     // When the tensor is the output of graph or in dynamic shape scenario, the output tensor may be changed.
363     if (output_device_tensors_[i] != output_address) {
364       output_device_tensors_[i] = output_address;
365       memory_alloc_list_[i] = output_address;
366       memory_free_list_[real_input_num_ + i] = output_address;
367 
368       // Update output data.
369       for (auto &output_data : output_data_by_output_index_[i]) {
370         MS_EXCEPTION_IF_NULL(output_data);
371         output_data->data_ = output_address;
372       }
373     }
374   }
375 }
376 
PreLaunchKernel(OpContext<DeviceTensor> *)377 void KernelActor::PreLaunchKernel(OpContext<DeviceTensor> *) {
378   for (size_t i = 0; i < input_device_tensors_.size(); ++i) {
379     MS_EXCEPTION_IF_NULL(input_device_tensors_[i]);
380     MS_EXCEPTION_IF_NULL(launch_info_.inputs_[i]);
381     launch_info_.inputs_[i]->addr = input_device_tensors_[i]->GetMutablePtr();
382     launch_info_.inputs_[i]->size = input_device_tensors_[i]->GetSize();
383   }
384 
385   for (size_t i = 0; i < output_device_tensors_.size(); ++i) {
386     MS_EXCEPTION_IF_NULL(output_device_tensors_[i]);
387     MS_EXCEPTION_IF_NULL(launch_info_.outputs_[i]);
388     launch_info_.outputs_[i]->addr = output_device_tensors_[i]->GetMutablePtr();
389     launch_info_.outputs_[i]->size = output_device_tensors_[i]->GetSize();
390   }
391 
392   for (size_t i = 0; i < workspace_device_tensors_.size(); ++i) {
393     MS_EXCEPTION_IF_NULL(workspace_device_tensors_[i]);
394     MS_EXCEPTION_IF_NULL(launch_info_.workspaces_[i]);
395     launch_info_.workspaces_[i]->addr = workspace_device_tensors_[i]->GetMutablePtr();
396     launch_info_.workspaces_[i]->size = workspace_device_tensors_[i]->GetSize();
397   }
398 }
399 
PostLaunchKernel(OpContext<DeviceTensor> * const context)400 void KernelActor::PostLaunchKernel(OpContext<DeviceTensor> *const context) {
401   running_dependent_msg_num_ = SizeToInt(input_datas_num_ + input_controls_num_);
402 
403   // The input is invalid and needs to be erased when finish kernel launch.
404   EraseInput(context);
405 
406   // Note that SendMemoryFreeReq must be in front of SendOutput, because SendOutput will trigger SendMemoryAllocReq of
407   // the next actor and the actor is asynchronous execution. So it is necessary to ensure that SendMemoryFreeReq of the
408   // current actor is in front of SendMemoryAllocReq of the next actor.  One is to reuse the memory more fully, the
409   // other is to ensure the execution order and avoid the illegal memory timing problem.
410   if (memory_free_list_.size() > 0) {
411     SendMemoryFreeReq(context);
412   }
413   SendOutput(context);
414 }
415 
SendOutput(OpContext<DeviceTensor> * const context) const416 void KernelActor::SendOutput(OpContext<DeviceTensor> *const context) const {
417   MS_EXCEPTION_IF_NULL(context);
418   MS_EXCEPTION_IF_NULL(kernel_);
419   if (strategy_ == GraphExecutionStrategy::kStep) {
420     return;
421   }
422 
423   // Must be the execution order: send result --> send data --> send control, avoid the illegal timing problem.
424   // 1.Send graph output result.
425   for (const auto &result_arrow : output_result_arrows_) {
426     MS_EXCEPTION_IF_NULL(result_arrow);
427     Async(result_arrow->to_op_id_, &OutputActor::CollectOutput, kernel_, result_arrow->from_output_index_,
428           result_arrow->to_input_index_, context);
429   }
430 
431   // 2.Send output data.
432   for (auto &output_data : output_data_) {
433     MS_EXCEPTION_IF_NULL(output_data);
434     Async(output_data->op_id_, &OpActor::RunOpData, output_data, context);
435   }
436 
437   // 3.Send output control.
438   if (output_control_arrows_.size() > 0) {
439     auto source_aid = const_cast<AID *>(&GetAID());
440     for (auto &output_control : output_control_arrows_) {
441       Async(output_control, &OpActor::RunOpControl, source_aid, context);
442     }
443   }
444 
445   // 4.Send recorder info.
446   if (recorder_aid_ != nullptr) {
447     Async(*recorder_aid_, &RecorderActor::RecordInfo, kernel_->fullname_with_scope(), &launch_info_,
448           device_contexts_[0], context);
449   }
450 
451   // No output.
452   if ((output_data_arrows_.size() == 0) && (output_control_arrows_.size() == 0) &&
453       (output_result_arrows_.size() == 0)) {
454     SET_OPCONTEXT_SUCCESS_RET((*context));
455   }
456 }
457 }  // namespace runtime
458 }  // namespace mindspore
459