1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "runtime/framework/actor/kernel_actor.h"
18 #include "runtime/framework/actor/memory_manager_actor.h"
19 #include "runtime/framework/actor/output_actor.h"
20 #include "runtime/framework/actor/recorder_actor.h"
21 #include "runtime/framework/actor/debug_actor.h"
22 #include "mindrt/include/async/async.h"
23 #include "utils/log_adapter.h"
24
25 namespace mindspore {
26 namespace runtime {
Init()27 void KernelActor::Init() {
28 // Check device contexts number.
29 if (device_contexts_.size() != device::kDeviceContextsNumOne) {
30 MS_LOG(EXCEPTION) << "The device contexts number is wrong.";
31 }
32
33 // Set the number of actor running dependent messages.
34 running_dependent_msg_num_ = SizeToInt(input_datas_num_ + input_controls_num_);
35
36 MS_EXCEPTION_IF_NULL(kernel_);
37 real_input_num_ = AnfAlgo::GetInputTensorNum(kernel_);
38 kernel_info_ = dynamic_cast<KernelInfo *>(kernel_->kernel_info());
39 is_dynamic_shape_ = AnfAlgo::IsDynamicShape(kernel_);
40
41 // Init the device tensors and kernel launch info.
42 copy_input_device_tensors_.resize(real_input_num_);
43 input_device_tensors_.resize(real_input_num_);
44 for (auto &input_address : input_device_tensors_) {
45 (void)memory_free_list_.emplace_back(input_address);
46 (void)launch_info_.inputs_.emplace_back(std::make_shared<Address>());
47 }
48 MS_EXCEPTION_IF_NULL(kernel_info_);
49 for (auto &output_address : kernel_info_->output_address_list()) {
50 MS_EXCEPTION_IF_NULL(output_address);
51 (void)output_device_tensors_.emplace_back(output_address.get());
52 (void)memory_alloc_list_.emplace_back(output_address.get());
53 (void)memory_free_list_.emplace_back(output_address.get());
54 (void)launch_info_.outputs_.emplace_back(std::make_shared<Address>());
55 }
56 for (auto &workspace_address : kernel_info_->workspace_address_list()) {
57 MS_EXCEPTION_IF_NULL(workspace_address);
58 (void)workspace_device_tensors_.emplace_back(workspace_address.get());
59 (void)memory_alloc_list_.emplace_back(workspace_address.get());
60 (void)memory_free_list_.emplace_back(workspace_address.get());
61 (void)launch_info_.workspaces_.emplace_back(std::make_shared<Address>());
62 }
63 for (auto &external_reference_tensor : external_reference_tensors_) {
64 (void)memory_free_list_.emplace_back(external_reference_tensor);
65 }
66
67 // Init the output data.
68 output_data_by_output_index_.resize(output_device_tensors_.size());
69 for (auto &data_arrow : output_data_arrows_) {
70 MS_EXCEPTION_IF_NULL(data_arrow);
71 if (IntToSize(data_arrow->from_output_index_) >= output_device_tensors_.size()) {
72 MS_LOG(EXCEPTION) << "The output index is out of range: " << GetAID().Name();
73 }
74 auto device_address = output_device_tensors_[data_arrow->from_output_index_];
75 auto data =
76 std::make_unique<OpData<DeviceTensor>>(data_arrow->to_op_id_, device_address, data_arrow->to_input_index_);
77 (void)output_data_.emplace_back(data.get());
78 (void)output_data_by_output_index_[data_arrow->from_output_index_].emplace_back(std::move(data));
79 }
80 }
81
RunOpData(OpData<DeviceTensor> * const input_data,OpContext<DeviceTensor> * const context)82 void KernelActor::RunOpData(OpData<DeviceTensor> *const input_data, OpContext<DeviceTensor> *const context) {
83 MS_EXCEPTION_IF_NULL(context);
84 MS_EXCEPTION_IF_NULL(device_contexts_[0]);
85
86 auto &sequential_num = context->sequential_num_;
87 (void)input_op_datas_[sequential_num].emplace_back(input_data);
88 if (input_data->data_ == nullptr) {
89 std::string error_info =
90 "Input data of actor:" + GetAID().Name() + " num:" + std::to_string(input_data->index_) + " is empty";
91 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
92 }
93 // When all the inputs are collected, then allocate memory and callback launch.
94 if (CheckRunningCondition(context)) {
95 // Infer kernel shape and update abstract info for dynamic shape kernel.
96 if (is_dynamic_shape_) {
97 device_contexts_[0]->UpdateDynamicShape(kernel_);
98 }
99
100 FetchInputDeviceTensor(context);
101 FetchOutputDeviceTensor();
102 if (memory_alloc_list_.size() > 0) {
103 SendMemoryAllocReq(context);
104 } else {
105 OnMemoryAllocFinish(context);
106 }
107 }
108 }
109
RunOpControl(AID * const input_control,OpContext<DeviceTensor> * const context)110 void KernelActor::RunOpControl(AID *const input_control, OpContext<DeviceTensor> *const context) {
111 MS_EXCEPTION_IF_NULL(context);
112 MS_EXCEPTION_IF_NULL(device_contexts_[0]);
113
114 auto &sequential_num = context->sequential_num_;
115 (void)input_op_controls_[sequential_num].emplace_back(input_control);
116 // When all the inputs are collected, then allocate memory and callback launch.
117 if (CheckRunningCondition(context)) {
118 // Infer kernel shape and update abstract info for dynamic shape kernel.
119 if (is_dynamic_shape_) {
120 device_contexts_[0]->UpdateDynamicShape(kernel_);
121 }
122
123 FetchInputDeviceTensor(context);
124 FetchOutputDeviceTensor();
125 if (memory_alloc_list_.size() > 0) {
126 SendMemoryAllocReq(context);
127 } else {
128 OnMemoryAllocFinish(context);
129 }
130 }
131 }
132
RunOpControlWithInputTensor(AID * const input_control,OpContext<DeviceTensor> * const context,const std::vector<TensorPtr> * input_tensors)133 void KernelActor::RunOpControlWithInputTensor(AID *const input_control, OpContext<DeviceTensor> *const context,
134 const std::vector<TensorPtr> *input_tensors) {
135 MS_EXCEPTION_IF_NULL(context);
136 MS_EXCEPTION_IF_NULL(input_tensors);
137 auto &sequential_num = context->sequential_num_;
138 (void)input_op_controls_[sequential_num].emplace_back(input_control);
139
140 PushInputDeviceTensor(input_tensors);
141 // When all the inputs are collected, then allocate memory and callback launch.
142 if (CheckRunningCondition(context)) {
143 FetchOutputDeviceTensor();
144 if (memory_alloc_list_.size() > 0) {
145 SendMemoryAllocReq(context);
146 }
147 OnMemoryAllocFinish(context);
148 }
149 }
150
151 namespace {
AllocateMemory(const std::vector<DeviceTensor * > & alloc_list,const DeviceContext * device_context,OpContext<DeviceTensor> * const context,const std::string & actor_name)152 void AllocateMemory(const std::vector<DeviceTensor *> &alloc_list, const DeviceContext *device_context,
153 OpContext<DeviceTensor> *const context, const std::string &actor_name) {
154 MS_EXCEPTION_IF_NULL(device_context);
155 MS_EXCEPTION_IF_NULL(context);
156
157 for (auto &device_tensor : alloc_list) {
158 MS_EXCEPTION_IF_NULL(device_tensor);
159 if ((device_tensor->GetPtr() != nullptr) || (device_tensor->GetSize() == 0)) {
160 continue;
161 }
162 // Allocate memory through the device context.
163 if (!device_context->AllocateMemory(device_tensor, device_tensor->GetSize())) {
164 SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kStep, *context, *device_context, actor_name,
165 device_tensor->GetSize());
166 }
167 }
168 }
169
FreeMemory(const std::vector<DeviceTensor * > & free_list,const DeviceContext * device_context)170 void FreeMemory(const std::vector<DeviceTensor *> &free_list, const DeviceContext *device_context) {
171 MS_EXCEPTION_IF_NULL(device_context);
172 for (auto &device_tensor : free_list) {
173 MS_EXCEPTION_IF_NULL(device_tensor);
174 if (device_tensor->original_ref_count() == SIZE_MAX) {
175 continue;
176 }
177 // The reference count is decremented to zero to free memory, and reset to the original count.
178 device_tensor->DecreaseRefCount();
179 if (device_tensor->ref_count() == 0) {
180 // Free memory through the device context.
181 if (device_tensor->GetPtr() != nullptr) {
182 device_context->FreeMemory(device_tensor);
183 }
184 device_tensor->ResetRefCount();
185 }
186 }
187 }
188 } // namespace
189
SendMemoryAllocReq(OpContext<DeviceTensor> * const context)190 void KernelActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
191 running_dependent_msg_num_ = 1;
192 if (strategy_ == GraphExecutionStrategy::kPipeline) {
193 Async(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &memory_alloc_list_, device_contexts_[0], context,
194 GetAID());
195 } else {
196 AllocateMemory(memory_alloc_list_, device_contexts_[0], context, GetAID().Name());
197 }
198 }
199
SendMemoryFreeReq(OpContext<DeviceTensor> * const context)200 void KernelActor::SendMemoryFreeReq(OpContext<DeviceTensor> *const context) {
201 if (strategy_ == GraphExecutionStrategy::kPipeline) {
202 Async(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &memory_free_list_, device_contexts_[0], context);
203 } else {
204 FreeMemory(memory_free_list_, device_contexts_[0]);
205 }
206 }
207
OnMemoryAllocFinish(OpContext<DeviceTensor> * const context)208 void KernelActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
209 MS_EXCEPTION_IF_NULL(context);
210 MS_EXCEPTION_IF_NULL(kernel_);
211 MS_EXCEPTION_IF_NULL(device_contexts_[0]);
212 PreLaunchKernel(context);
213
214 try {
215 auto ret = device_contexts_[0]->LaunchKernel(kernel_, launch_info_.inputs_, launch_info_.workspaces_,
216 launch_info_.outputs_, is_dynamic_shape_);
217 if (!ret) {
218 std::string error_info = "Launch kernel failed: " + kernel_->fullname_with_scope();
219 SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
220 }
221 } catch (const std::exception &e) {
222 MsException::Instance().SetException();
223 std::string error_info = "Launch kernel exception: " + kernel_->fullname_with_scope();
224 SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
225 }
226
227 // Debug actor is blocked, must wait debug actor callback message to process continue.
228 if (debug_aid_ != nullptr && strategy_ == GraphExecutionStrategy::kPipeline) {
229 SendDebugReq(context);
230 return;
231 }
232
233 PostLaunchKernel(context);
234 }
235
SendDebugReq(OpContext<DeviceTensor> * const context)236 void KernelActor::SendDebugReq(OpContext<DeviceTensor> *const context) {
237 running_dependent_msg_num_ = 1;
238 Async(*debug_aid_, &DebugActor::Debug, kernel_, &launch_info_, device_contexts_[0], context, &GetAID());
239 }
240
OnDebugFinish(OpContext<DeviceTensor> * context)241 void KernelActor::OnDebugFinish(OpContext<DeviceTensor> *context) {
242 MS_EXCEPTION_IF_NULL(context);
243 PostLaunchKernel(context);
244 }
245
PushInputDeviceTensor(const std::vector<TensorPtr> * input_tensors)246 void KernelActor::PushInputDeviceTensor(const std::vector<TensorPtr> *input_tensors) {
247 MS_EXCEPTION_IF_NULL(input_tensors);
248 if (input_tensors->size() != real_input_num_) {
249 MS_LOG(ERROR) << "Input tensor number: " << input_tensors->size()
250 << " is not equal to kernel's input size: " << real_input_num_;
251 return;
252 }
253
254 for (size_t input_index = 0; input_index < input_tensors->size(); input_index++) {
255 const auto &input_tensor = (*input_tensors)[input_index];
256 MS_EXCEPTION_IF_NULL(input_tensor);
257 const auto &device_tensor = std::dynamic_pointer_cast<DeviceTensor>(input_tensor->device_address());
258 if (device_tensor != nullptr) {
259 input_device_tensors_[input_index] = device_tensor.get();
260 memory_free_list_[input_index] = device_tensor.get();
261 }
262 }
263 }
264
CopyInputDeviceTensor(const OpData<DeviceTensor> * input_data,OpContext<DeviceTensor> * const context)265 void KernelActor::CopyInputDeviceTensor(const OpData<DeviceTensor> *input_data,
266 OpContext<DeviceTensor> *const context) {
267 MS_EXCEPTION_IF_NULL(input_data);
268 MS_EXCEPTION_IF_NULL(context);
269 MS_EXCEPTION_IF_NULL(device_contexts_[0]);
270 if ((input_data->data_ == nullptr) ||
271 (input_data->data_->DeviceType() == device_contexts_[0]->GetDeviceAddressType())) {
272 return;
273 }
274
275 MS_LOG(DEBUG) << "Copy from device type: " << input_data->data_->DeviceType()
276 << " to device type: " << device_contexts_[0]->GetDeviceAddressType() << " in " << GetAID().Name();
277 if (copy_input_device_tensors_[input_data->index_] == nullptr) {
278 copy_input_device_tensors_[input_data->index_] = device_contexts_[0]->CreateDeviceAddress(
279 nullptr, input_data->data_->GetSize(), input_data->data_->format(), input_data->data_->type_id());
280 }
281 // Dynamic shape need update size.
282 copy_input_device_tensors_[input_data->index_]->SetSize(input_data->data_->GetSize());
283
284 if (copy_input_device_tensors_[input_data->index_]->GetPtr() == nullptr) {
285 if (!device_contexts_[0]->AllocateMemory(copy_input_device_tensors_[input_data->index_].get(),
286 copy_input_device_tensors_[input_data->index_]->GetSize())) {
287 SET_OPCONTEXT_MEMORY_ALLOC_FAIL_BY_STRATEGY(GraphExecutionStrategy::kPipeline, *context, *(device_contexts_[0]),
288 GetAID().Name(),
289 copy_input_device_tensors_[input_data->index_]->GetSize());
290 }
291 }
292
293 if (!Copy(copy_input_device_tensors_[input_data->index_].get(), input_data->data_)) {
294 std::string error_info = "Copy device tensor failed: " + GetAID().Name();
295 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
296 }
297
298 // Update by the copy input device tensor.
299 input_device_tensors_[input_data->index_] = copy_input_device_tensors_[input_data->index_].get();
300 memory_free_list_[input_data->index_] = copy_input_device_tensors_[input_data->index_].get();
301 }
302
FetchInputDeviceTensor(OpContext<DeviceTensor> * const context)303 void KernelActor::FetchInputDeviceTensor(OpContext<DeviceTensor> *const context) {
304 MS_EXCEPTION_IF_NULL(context);
305 MS_EXCEPTION_IF_NULL(device_contexts_[0]);
306
307 const auto &data_iter = input_op_datas_.find(context->sequential_num_);
308 if (data_iter != input_op_datas_.end()) {
309 for (auto &input_data : data_iter->second) {
310 MS_EXCEPTION_IF_NULL(input_data);
311 if (IntToSize(input_data->index_) >= input_device_tensors_.size()) {
312 SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), "The input index is out of range.");
313 }
314
315 if (input_device_tensors_[input_data->index_] != input_data->data_) {
316 input_device_tensors_[input_data->index_] = input_data->data_;
317 memory_free_list_[input_data->index_] = input_data->data_;
318 }
319 CopyInputDeviceTensor(input_data, context);
320 }
321 }
322
323 for (auto &device_tensor_store_key : device_tensor_store_keys_) {
324 auto device_tensor = DeviceTensorStore::GetInstance().Fetch(device_tensor_store_key.second.get(),
325 device_contexts_[0]->GetDeviceAddressType());
326 if (device_tensor == nullptr) {
327 std::string error_info =
328 GetAID().Name() + " get device tensor store failed: " + device_tensor_store_key.second->fullname_with_scope() +
329 ", device type:" + std::to_string(static_cast<int>(device_contexts_[0]->GetDeviceAddressType()));
330 SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), error_info);
331 }
332
333 if (device_tensor_store_key.first >= input_device_tensors_.size()) {
334 SET_OPCONTEXT_FAIL_RET_WITH_ERROR_BY_STRATEGY(strategy_, (*context), "The input index is out of range.");
335 }
336 if (input_device_tensors_[device_tensor_store_key.first] != device_tensor) {
337 input_device_tensors_[device_tensor_store_key.first] = device_tensor;
338 memory_free_list_[device_tensor_store_key.first] = device_tensor;
339 }
340 }
341 }
342
FetchOutputDeviceTensor()343 void KernelActor::FetchOutputDeviceTensor() {
344 MS_EXCEPTION_IF_NULL(kernel_info_);
345 auto &output_addresses = kernel_info_->output_address_list();
346 const auto &kernel_mod = kernel_info_->kernel_mod();
347 MS_EXCEPTION_IF_NULL(kernel_mod);
348 const auto &output_size_list = kernel_mod->GetOutputSizeList();
349
350 if (output_addresses.size() != output_size_list.size()) {
351 MS_LOG(EXCEPTION) << "The outputs number is not equal.";
352 }
353
354 for (size_t i = 0; i < output_addresses.size(); ++i) {
355 auto output_address = output_addresses[i].get();
356 MS_EXCEPTION_IF_NULL(output_address);
357 if (output_size_list[i] != output_address->GetSize()) {
358 // The size of output address may be changed in dynamic shape scenario.
359 output_address->SetSize(output_size_list[i]);
360 }
361
362 // When the tensor is the output of graph or in dynamic shape scenario, the output tensor may be changed.
363 if (output_device_tensors_[i] != output_address) {
364 output_device_tensors_[i] = output_address;
365 memory_alloc_list_[i] = output_address;
366 memory_free_list_[real_input_num_ + i] = output_address;
367
368 // Update output data.
369 for (auto &output_data : output_data_by_output_index_[i]) {
370 MS_EXCEPTION_IF_NULL(output_data);
371 output_data->data_ = output_address;
372 }
373 }
374 }
375 }
376
PreLaunchKernel(OpContext<DeviceTensor> *)377 void KernelActor::PreLaunchKernel(OpContext<DeviceTensor> *) {
378 for (size_t i = 0; i < input_device_tensors_.size(); ++i) {
379 MS_EXCEPTION_IF_NULL(input_device_tensors_[i]);
380 MS_EXCEPTION_IF_NULL(launch_info_.inputs_[i]);
381 launch_info_.inputs_[i]->addr = input_device_tensors_[i]->GetMutablePtr();
382 launch_info_.inputs_[i]->size = input_device_tensors_[i]->GetSize();
383 }
384
385 for (size_t i = 0; i < output_device_tensors_.size(); ++i) {
386 MS_EXCEPTION_IF_NULL(output_device_tensors_[i]);
387 MS_EXCEPTION_IF_NULL(launch_info_.outputs_[i]);
388 launch_info_.outputs_[i]->addr = output_device_tensors_[i]->GetMutablePtr();
389 launch_info_.outputs_[i]->size = output_device_tensors_[i]->GetSize();
390 }
391
392 for (size_t i = 0; i < workspace_device_tensors_.size(); ++i) {
393 MS_EXCEPTION_IF_NULL(workspace_device_tensors_[i]);
394 MS_EXCEPTION_IF_NULL(launch_info_.workspaces_[i]);
395 launch_info_.workspaces_[i]->addr = workspace_device_tensors_[i]->GetMutablePtr();
396 launch_info_.workspaces_[i]->size = workspace_device_tensors_[i]->GetSize();
397 }
398 }
399
PostLaunchKernel(OpContext<DeviceTensor> * const context)400 void KernelActor::PostLaunchKernel(OpContext<DeviceTensor> *const context) {
401 running_dependent_msg_num_ = SizeToInt(input_datas_num_ + input_controls_num_);
402
403 // The input is invalid and needs to be erased when finish kernel launch.
404 EraseInput(context);
405
406 // Note that SendMemoryFreeReq must be in front of SendOutput, because SendOutput will trigger SendMemoryAllocReq of
407 // the next actor and the actor is asynchronous execution. So it is necessary to ensure that SendMemoryFreeReq of the
408 // current actor is in front of SendMemoryAllocReq of the next actor. One is to reuse the memory more fully, the
409 // other is to ensure the execution order and avoid the illegal memory timing problem.
410 if (memory_free_list_.size() > 0) {
411 SendMemoryFreeReq(context);
412 }
413 SendOutput(context);
414 }
415
SendOutput(OpContext<DeviceTensor> * const context) const416 void KernelActor::SendOutput(OpContext<DeviceTensor> *const context) const {
417 MS_EXCEPTION_IF_NULL(context);
418 MS_EXCEPTION_IF_NULL(kernel_);
419 if (strategy_ == GraphExecutionStrategy::kStep) {
420 return;
421 }
422
423 // Must be the execution order: send result --> send data --> send control, avoid the illegal timing problem.
424 // 1.Send graph output result.
425 for (const auto &result_arrow : output_result_arrows_) {
426 MS_EXCEPTION_IF_NULL(result_arrow);
427 Async(result_arrow->to_op_id_, &OutputActor::CollectOutput, kernel_, result_arrow->from_output_index_,
428 result_arrow->to_input_index_, context);
429 }
430
431 // 2.Send output data.
432 for (auto &output_data : output_data_) {
433 MS_EXCEPTION_IF_NULL(output_data);
434 Async(output_data->op_id_, &OpActor::RunOpData, output_data, context);
435 }
436
437 // 3.Send output control.
438 if (output_control_arrows_.size() > 0) {
439 auto source_aid = const_cast<AID *>(&GetAID());
440 for (auto &output_control : output_control_arrows_) {
441 Async(output_control, &OpActor::RunOpControl, source_aid, context);
442 }
443 }
444
445 // 4.Send recorder info.
446 if (recorder_aid_ != nullptr) {
447 Async(*recorder_aid_, &RecorderActor::RecordInfo, kernel_->fullname_with_scope(), &launch_info_,
448 device_contexts_[0], context);
449 }
450
451 // No output.
452 if ((output_data_arrows_.size() == 0) && (output_control_arrows_.size() == 0) &&
453 (output_result_arrows_.size() == 0)) {
454 SET_OPCONTEXT_SUCCESS_RET((*context));
455 }
456 }
457 } // namespace runtime
458 } // namespace mindspore
459