1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "runtime/graph_scheduler/actor/data_source_actor.h"
18 #include "runtime/graph_scheduler/actor/kernel_actor.h"
19 #include "runtime/graph_scheduler/actor/memory_manager_actor.h"
20 #include "runtime/graph_scheduler/actor/output_actor.h"
21 #include "runtime/graph_scheduler/actor/recorder_actor.h"
22 #include "runtime/graph_scheduler/actor/debug_actor.h"
23 #include "mindrt/include/async/async.h"
24 #include "utils/log_adapter.h"
25 #include "kernel/common_utils.h"
26 #include "mindspore/core/utils/ms_context.h"
27 #include "include/backend/mem_reuse/mem_tracker.h"
28
29 namespace mindspore {
30 namespace runtime {
Init()31 void DataSourceActor::Init() {
32 // Check device contexts number.
33 if (device_contexts_.size() < device::kDeviceContextsNumOne) {
34 MS_LOG(EXCEPTION) << "The device contexts number is wrong.";
35 }
36
37 InitOutputData();
38 }
39
FetchData(OpContext<DeviceTensor> * const context)40 void DataSourceActor::FetchData(OpContext<DeviceTensor> *const context) {
41 MS_LOG(INFO) << "Data source actor(" << GetAID().Name() << ") fetches data.";
42 MS_EXCEPTION_IF_NULL(context);
43 device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, GetAID().Name(), GetAID().Name(), "");
44 // Pop the data of last time.
45 if (!buffers_.empty()) {
46 buffers_.pop();
47 }
48
49 // Construct device tensors and fill to the buffers from member nodes.
50 FillDataBuffer();
51 if (buffers_.size() == 0) {
52 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
53 }
54
55 // Allocate memory for device tensors.
56 SendMemoryAllocReq(context);
57 }
58
UpdateOutputData(OpData<DeviceTensor> * const output_data,const DataArrowPtr & data_arrow,const AnfNodePtr & output_node,OpContext<DeviceTensor> * const context)59 void DataSourceActor::UpdateOutputData(OpData<DeviceTensor> *const output_data, const DataArrowPtr &data_arrow,
60 const AnfNodePtr &output_node, OpContext<DeviceTensor> *const context) {
61 MS_EXCEPTION_IF_NULL(output_data);
62 MS_EXCEPTION_IF_NULL(data_arrow);
63 MS_EXCEPTION_IF_NULL(output_node);
64 MS_EXCEPTION_IF_NULL(context);
65
66 if (buffers_.size() == 0) {
67 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
68 }
69 const auto &output_device_tensors = buffers_.front();
70
71 auto position = FetchNodePosition({output_node, data_arrow->from_output_index_});
72 // Host data souruce actor uses the node position, device data source actor uses the output index.
73 auto output_position = (position != 0) ? position : IntToSize(data_arrow->from_output_index_);
74 if (output_position >= output_device_tensors.size()) {
75 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The output index is of range.");
76 }
77 output_data->data_ = output_device_tensors[output_position];
78 }
79
Init()80 void DeviceQueueDataSourceActor::Init() {
81 // Check device contexts number.
82 if (device_contexts_.size() != device::kDeviceContextsNumOne) {
83 MS_LOG(EXCEPTION) << "The device contexts number is wrong.";
84 }
85
86 InitOutputData();
87
88 // Init kernel launch info.
89 MS_EXCEPTION_IF_NULL(kernel_info_);
90 const auto &output_addresses = kernel_info_->output_address_list();
91 for (size_t i = 0; i < output_addresses.size(); ++i) {
92 (void)output_kernel_tensors_.emplace_back(output_addresses[i]->kernel_tensor().get());
93 if (recorder_aid_ != nullptr || debug_aid_ != nullptr) {
94 mem_info_.outputs_.emplace_back(std::make_shared<Address>());
95 }
96 }
97
98 is_dynamic_shape_ = common::AnfAlgo::IsDynamicShape(data_kernel_);
99 stream_ = device_contexts_[0]->device_res_manager_->GetStream(kernel_info_->stream_id());
100 }
101
FillDataBuffer()102 void DeviceQueueDataSourceActor::FillDataBuffer() {
103 MS_EXCEPTION_IF_NULL(kernel_info_);
104 if (is_dynamic_shape_) {
105 // For GetNext dynamic case, the Resize method finish update output shape and output size in kernel tensor via data
106 // item from MindData, need not do infer shape first.
107 const auto &kernel_mod = kernel_info_->MutableKernelMod();
108 MS_EXCEPTION_IF_NULL(kernel_mod);
109 int ret = kernel_mod->Resize({}, output_kernel_tensors_);
110 if (ret != kernel::KRET_OK) {
111 MS_LOG_WITH_NODE(EXCEPTION, data_kernel_) << "Resize failed for kernel: " << data_kernel_->fullname_with_scope();
112 }
113 }
114
115 // Construct device tensors.
116 std::vector<DeviceTensor *> device_tensors;
117 for (auto &device_tensor : kernel_info_->output_address_list()) {
118 MS_EXCEPTION_IF_NULL(device_tensor);
119 (void)device_tensors.emplace_back(device_tensor.get());
120 }
121
122 buffers_.push(device_tensors);
123 }
124
SendMemoryAllocReq(OpContext<DeviceTensor> * const context)125 void DeviceQueueDataSourceActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
126 auto &device_tensors = buffers_.back();
127 if (ActorDispatcher::is_memory_allocation_sync()) {
128 ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &device_tensors,
129 device_contexts_[0], context, GetAID());
130 OnMemoryAllocFinish(context);
131 } else {
132 ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &device_tensors,
133 device_contexts_[0], context, GetAID());
134 }
135 }
136
SendMemoryFreeReq(OpContext<DeviceTensor> * const context)137 void DeviceQueueDataSourceActor::SendMemoryFreeReq(OpContext<DeviceTensor> *const context) {
138 auto &device_tensors = buffers_.front();
139 if (device_contexts_.empty()) {
140 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Empty device contexts in device data source actor.");
141 }
142 if (ActorDispatcher::is_memory_free_sync()) {
143 ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &device_tensors,
144 device_contexts_[0], context, GetAID());
145 } else {
146 ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &device_tensors, device_contexts_[0],
147 context, GetAID());
148 }
149 }
150
OnMemoryAllocFinish(OpContext<DeviceTensor> * const context)151 void DeviceQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
152 MS_EXCEPTION_IF_NULL(context);
153 MS_EXCEPTION_IF_NULL(data_kernel_);
154 MS_EXCEPTION_IF_CHECK_FAIL((!device_contexts_.empty()), "The device context doesn't exist.");
155 MS_EXCEPTION_IF_NULL(device_contexts_[0]);
156 if (IsRunningFailed(context)) {
157 return;
158 }
159 if (buffers_.size() == 0) {
160 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
161 }
162
163 // Construct outputs of data kernel launching.
164 auto &device_tensors = buffers_.back();
165 if (output_kernel_tensors_.size() != device_tensors.size()) {
166 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The outputs number is not equal to the device tensors number.");
167 }
168 for (size_t i = 0; i < device_tensors.size(); ++i) {
169 MS_EXCEPTION_IF_NULL(output_kernel_tensors_[i]);
170 MS_EXCEPTION_IF_NULL(device_tensors[i]);
171 output_kernel_tensors_[i]->set_device_ptr(device_tensors[i]->GetMutablePtr());
172 output_kernel_tensors_[i]->set_size(device_tensors[i]->GetSize());
173 if (recorder_aid_ != nullptr || debug_aid_ != nullptr) {
174 mem_info_.outputs_[i]->addr = device_tensors[i]->GetMutablePtr();
175 mem_info_.outputs_[i]->size = device_tensors[i]->GetSize();
176 }
177 }
178
179 if (debug_aid_ != nullptr) {
180 ActorDispatcher::SendSync(*debug_aid_, &DebugActor::DebugPreLaunch, data_kernel_, std::vector<DeviceTensor *>(),
181 device_tensors, device_contexts_[0], context, &GetAID());
182 }
183
184 // Copy data from device queue by data kernel launching.
185 MS_EXCEPTION_IF_NULL(kernel_info_);
186 try {
187 uint64_t start_time = 0;
188 PROFILER_START(start_time);
189 auto kernel_mod = AnfAlgo::GetKernelMod(data_kernel_);
190 auto ret = device_contexts_[0]->GetKernelExecutor(false)->LaunchKernel(data_kernel_, {}, {}, output_kernel_tensors_,
191 kernel_mod, stream_);
192 PROFILER_END(start_time, ProfilerModule::kKernel, ProfilerEvent::kKernelLaunch, GetAID().Name(), false);
193 if (!ret) {
194 std::string error_info = "Launch kernel failed: " + data_kernel_->fullname_with_scope();
195 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
196 }
197 } catch (const std::exception &e) {
198 MsException::Instance().SetException();
199 std::string error_info = "Launch kernel exception: " + data_kernel_->fullname_with_scope();
200 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
201 }
202
203 // Debug actor is blocked, must wait debug actor callback message to process continue.
204 if (debug_aid_ != nullptr) {
205 SendDebugReq(context);
206 return;
207 }
208
209 PostRun(context);
210 }
211
SendDebugReq(OpContext<DeviceTensor> * const context)212 void DeviceQueueDataSourceActor::SendDebugReq(OpContext<DeviceTensor> *const context) {
213 ActorDispatcher::SendSync(*debug_aid_, &DebugActor::DebugPostLaunch, data_kernel_, std::vector<DeviceTensor *>(),
214 buffers_.back(), device_contexts_[0], context, &GetAID());
215 OnDebugFinish(context);
216 }
217
SendRecorderInfo(OpContext<DeviceTensor> * const context) const218 void DeviceQueueDataSourceActor::SendRecorderInfo(OpContext<DeviceTensor> *const context) const {
219 if (recorder_aid_ != nullptr && (!device_contexts_.empty())) {
220 MS_EXCEPTION_IF_NULL(data_kernel_);
221 ActorDispatcher::Send(*recorder_aid_, &RecorderActor::RecordInfo, data_kernel_->fullname_with_scope(), &mem_info_,
222 device_contexts_[0], context);
223 }
224 }
225
FillDataBuffer()226 void HostQueueDataSourceActor::FillDataBuffer() {
227 // Construct device tensors.
228 std::vector<DeviceTensor *> device_tensors;
229 for (auto &node_with_index : data_node_with_indexs_) {
230 MS_LOG(DEBUG) << "Node:" << node_with_index.first->DebugString() << " index:" << node_with_index.second;
231 auto device_address = AnfAlgo::GetMutableOutputAddr(node_with_index.first, node_with_index.second, false);
232 MS_EXCEPTION_IF_NULL(device_address);
233 (void)device_tensors.emplace_back(device_address.get());
234 }
235
236 buffers_.push(device_tensors);
237 }
238
SendMemoryAllocReq(OpContext<DeviceTensor> * const context)239 void HostQueueDataSourceActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
240 if (device_contexts_.empty()) {
241 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Empty device contexts in device data source actor.");
242 }
243 auto &device_tensors = buffers_.back();
244 if (ActorDispatcher::is_memory_allocation_sync()) {
245 if (IsSameDeviceType()) {
246 ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &device_tensors,
247 device_contexts_[0], context, GetAID());
248 } else {
249 ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::AllocateBatchMemory, &device_tensors,
250 &device_contexts_, context, GetAID());
251 }
252 OnMemoryAllocFinish(context);
253 } else {
254 if (IsSameDeviceType()) {
255 ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &device_tensors,
256 device_contexts_[0], context, GetAID());
257 } else {
258 ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::AllocateBatchMemory, &device_tensors,
259 &device_contexts_, context, GetAID());
260 }
261 }
262 }
263
SendMemoryFreeReq(OpContext<DeviceTensor> * const context)264 void HostQueueDataSourceActor::SendMemoryFreeReq(OpContext<DeviceTensor> *const context) {
265 if (device_contexts_.empty()) {
266 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Empty device contexts in device data source actor.");
267 }
268 auto &device_tensors = buffers_.front();
269 if (ActorDispatcher::is_memory_free_sync()) {
270 if (IsSameDeviceType()) {
271 ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &device_tensors,
272 device_contexts_[0], context, GetAID());
273 } else {
274 ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::FreeBatchMemory, &device_tensors,
275 &device_contexts_, context, GetAID());
276 }
277 } else {
278 if (IsSameDeviceType()) {
279 ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &device_tensors, device_contexts_[0],
280 context, GetAID());
281 } else {
282 ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::FreeBatchMemory, &device_tensors,
283 &device_contexts_, context, GetAID());
284 }
285 }
286 }
287
OnMemoryAllocFinish(OpContext<DeviceTensor> * const context)288 void HostQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
289 auto ms_context = MsContext::GetInstance();
290 MS_EXCEPTION_IF_NULL(ms_context);
291 MS_EXCEPTION_IF_NULL(context);
292 if (IsRunningFailed(context)) {
293 return;
294 }
295 if (buffers_.size() == 0) {
296 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
297 }
298
299 // Get host tensors from host queue and get device tensors from buffers.
300 MS_EXCEPTION_IF_NULL(host_queue_);
301 if (host_queue_->IsEmpty()) {
302 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Host data queue is empty.");
303 }
304 auto &host_tensors = host_queue_->Pull();
305 auto &device_tensors = buffers_.back();
306 if (host_tensors.size() != device_tensors.size()) {
307 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context),
308 "The length of host tensors is not equal to the length of device tensors.");
309 }
310
311 // Copy data from host tensor to device tensor.
312 uint64_t start_time = 0;
313 PROFILER_START(start_time);
314 auto enable_async_copy = ms_context->IsEnableInferBoost() || IsTwoPhaseInfer();
315 try {
316 for (size_t i = 0; i < host_tensors.size(); ++i) {
317 auto &host_tensor = host_tensors[i];
318 auto &device_tensor = device_tensors[i];
319 MS_EXCEPTION_IF_NULL(device_tensor);
320 MS_EXCEPTION_IF_NULL(host_tensor);
321 // No used device address need skip.
322 if (TEST_FLAG(device_tensor->flag(), device::kDeviceAddressFlagNotUsed)) {
323 MS_LOG(DEBUG) << GetAID().Name() << " input index " << i << " is not used.";
324 continue;
325 }
326 auto tensor_device_address = std::dynamic_pointer_cast<DeviceTensor>(host_tensor->device_address());
327 // Sync data from host_tensor_device_address to device_tensor.
328 if (tensor_device_address != nullptr) {
329 if (tensor_device_address.get() == device_tensor) {
330 continue;
331 }
332 if (!Copy(device_tensor, tensor_device_address.get())) {
333 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Copy data failed.");
334 }
335 continue;
336 }
337 if (host_tensor->data_ptr() == nullptr && device_tensor->GetSize() == 0) {
338 MS_LOG(INFO) << "Empty tuple sync";
339 continue;
340 }
341
342 if (enable_async_copy) {
343 if (!device_tensor->AsyncHostToDevice(LongToSize(host_tensor->data().nbytes()), host_tensor->data_type(),
344 host_tensor->data_ptr()->data())) {
345 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "SyncHostToDevice failed.");
346 }
347 } else {
348 if (!device_tensor->SyncHostToDevice(
349 trans::GetRuntimePaddingShape(data_node_with_indexs_[i].first, data_node_with_indexs_[i].second),
350 LongToSize(host_tensor->data().nbytes()), host_tensor->data_type(),
351 host_tensor->device_info().host_format_, host_tensor->data_ptr())) {
352 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "SyncHostToDevice failed.");
353 }
354 }
355
356 if (IsDynamic(device_tensor->host_shape())) {
357 device_tensor->set_host_shape(host_tensor->shape());
358 }
359 }
360 } catch (const std::exception &e) {
361 MsException::Instance().SetException();
362 SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Host data source actor run exception.");
363 }
364 PROFILER_END(start_time, ProfilerModule::kRuntime, ProfilerEvent::kCopyData, GetAID().Name(), false);
365
366 PostRun(context);
367 }
368
FetchNodePosition(const KernelWithIndex & data_node) const369 size_t HostQueueDataSourceActor::FetchNodePosition(const KernelWithIndex &data_node) const {
370 MS_EXCEPTION_IF_NULL(data_node.first);
371 const auto &iter = data_node_position_map_.find(data_node);
372 if (iter == data_node_position_map_.end()) {
373 MS_LOG_WITH_NODE(EXCEPTION, data_node.first)
374 << "Data node: " << data_node.first->DebugString() << " index:" << data_node.second << " is not exist.";
375 }
376 return iter->second;
377 }
378
FetchNode(size_t node_position) const379 KernelWithIndex HostQueueDataSourceActor::FetchNode(size_t node_position) const {
380 if (node_position >= data_node_with_indexs_.size()) {
381 MS_LOG(EXCEPTION) << "The position of node is out of range: " << node_position;
382 }
383 return data_node_with_indexs_[node_position];
384 }
385
IsSameDeviceType() const386 bool HostQueueDataSourceActor::IsSameDeviceType() const {
387 for (size_t i = 1; i < device_contexts_.size(); i++) {
388 if (device_contexts_[i] != device_contexts_[0]) {
389 return false;
390 }
391 }
392 return true;
393 }
394
ReleaseData()395 void HostQueueDataSourceActor::ReleaseData() {
396 // The step end need free the host queue tensor.
397 MS_EXCEPTION_IF_NULL(host_queue_);
398 host_queue_->Pop();
399
400 // The step end need release data node address.
401 for (auto &data_node_with_index : data_node_with_indexs_) {
402 if (!AnfAlgo::OutputAddrExist(data_node_with_index.first, data_node_with_index.second)) {
403 continue;
404 }
405 auto old_address = AnfAlgo::GetMutableOutputAddr(data_node_with_index.first, data_node_with_index.second);
406 MS_EXCEPTION_IF_NULL(old_address);
407 if (old_address->GetPtr() == nullptr) {
408 // The Address memory is already freed.
409 continue;
410 }
411 // If the address from input tensor and the address is not used by runtime.
412 if (old_address->original_ref_count() == SIZE_MAX && !old_address->is_ptr_persisted()) {
413 auto device_context = device::DeviceContextManager::GetInstance().GetOrCreateDeviceContext(
414 {old_address->device_name(), old_address->device_id()});
415 MS_EXCEPTION_IF_NULL(device_context);
416 const auto &kernel_tensor = old_address->kernel_tensor();
417 MS_EXCEPTION_IF_NULL(kernel_tensor);
418 auto new_kernel_tensor = kernel_tensor->CloneKernelTensor();
419 MS_EXCEPTION_IF_NULL(new_kernel_tensor);
420 new_kernel_tensor->set_device_ptr(nullptr);
421
422 auto new_address = device_context->device_res_manager_->CreateDeviceAddress(new_kernel_tensor);
423 MS_EXCEPTION_IF_NULL(new_address);
424 MS_LOG(DEBUG) << "Create device tensor:" << new_address << " type:" << new_address->type_id()
425 << ", kernel tensor addr:" << new_kernel_tensor.get();
426 new_address->set_original_ref_count(old_address->original_ref_count());
427 new_address->ResetRefCount();
428 new_address->set_flag(old_address->flag());
429 auto [node, index] = old_address->GetNodeIndex();
430 new_address->SetNodeIndex(node, index);
431 AnfAlgo::SetOutputAddr(new_address, data_node_with_index.second, data_node_with_index.first.get());
432 }
433 }
434 }
435 } // namespace runtime
436 } // namespace mindspore
437