• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "runtime/graph_scheduler/actor/data_source_actor.h"
18 #include "runtime/graph_scheduler/actor/kernel_actor.h"
19 #include "runtime/graph_scheduler/actor/memory_manager_actor.h"
20 #include "runtime/graph_scheduler/actor/output_actor.h"
21 #include "runtime/graph_scheduler/actor/recorder_actor.h"
22 #include "runtime/graph_scheduler/actor/debug_actor.h"
23 #include "mindrt/include/async/async.h"
24 #include "utils/log_adapter.h"
25 #include "kernel/common_utils.h"
26 #include "mindspore/core/utils/ms_context.h"
27 #include "include/backend/mem_reuse/mem_tracker.h"
28 
29 namespace mindspore {
30 namespace runtime {
Init()31 void DataSourceActor::Init() {
32   // Check device contexts number.
33   if (device_contexts_.size() < device::kDeviceContextsNumOne) {
34     MS_LOG(EXCEPTION) << "The device contexts number is wrong.";
35   }
36 
37   InitOutputData();
38 }
39 
FetchData(OpContext<DeviceTensor> * const context)40 void DataSourceActor::FetchData(OpContext<DeviceTensor> *const context) {
41   MS_LOG(INFO) << "Data source actor(" << GetAID().Name() << ") fetches data.";
42   MS_EXCEPTION_IF_NULL(context);
43   device::tracker::CALL_MEMORY_TRACKER_WITH_FILE(AddTask, GetAID().Name(), GetAID().Name(), "");
44   // Pop the data of last time.
45   if (!buffers_.empty()) {
46     buffers_.pop();
47   }
48 
49   // Construct device tensors and fill to the buffers from member nodes.
50   FillDataBuffer();
51   if (buffers_.size() == 0) {
52     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
53   }
54 
55   // Allocate memory for device tensors.
56   SendMemoryAllocReq(context);
57 }
58 
UpdateOutputData(OpData<DeviceTensor> * const output_data,const DataArrowPtr & data_arrow,const AnfNodePtr & output_node,OpContext<DeviceTensor> * const context)59 void DataSourceActor::UpdateOutputData(OpData<DeviceTensor> *const output_data, const DataArrowPtr &data_arrow,
60                                        const AnfNodePtr &output_node, OpContext<DeviceTensor> *const context) {
61   MS_EXCEPTION_IF_NULL(output_data);
62   MS_EXCEPTION_IF_NULL(data_arrow);
63   MS_EXCEPTION_IF_NULL(output_node);
64   MS_EXCEPTION_IF_NULL(context);
65 
66   if (buffers_.size() == 0) {
67     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
68   }
69   const auto &output_device_tensors = buffers_.front();
70 
71   auto position = FetchNodePosition({output_node, data_arrow->from_output_index_});
72   // Host data souruce actor uses the node position, device data source actor uses the output index.
73   auto output_position = (position != 0) ? position : IntToSize(data_arrow->from_output_index_);
74   if (output_position >= output_device_tensors.size()) {
75     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The output index is of range.");
76   }
77   output_data->data_ = output_device_tensors[output_position];
78 }
79 
Init()80 void DeviceQueueDataSourceActor::Init() {
81   // Check device contexts number.
82   if (device_contexts_.size() != device::kDeviceContextsNumOne) {
83     MS_LOG(EXCEPTION) << "The device contexts number is wrong.";
84   }
85 
86   InitOutputData();
87 
88   // Init kernel launch info.
89   MS_EXCEPTION_IF_NULL(kernel_info_);
90   const auto &output_addresses = kernel_info_->output_address_list();
91   for (size_t i = 0; i < output_addresses.size(); ++i) {
92     (void)output_kernel_tensors_.emplace_back(output_addresses[i]->kernel_tensor().get());
93     if (recorder_aid_ != nullptr || debug_aid_ != nullptr) {
94       mem_info_.outputs_.emplace_back(std::make_shared<Address>());
95     }
96   }
97 
98   is_dynamic_shape_ = common::AnfAlgo::IsDynamicShape(data_kernel_);
99   stream_ = device_contexts_[0]->device_res_manager_->GetStream(kernel_info_->stream_id());
100 }
101 
FillDataBuffer()102 void DeviceQueueDataSourceActor::FillDataBuffer() {
103   MS_EXCEPTION_IF_NULL(kernel_info_);
104   if (is_dynamic_shape_) {
105     // For GetNext dynamic case, the Resize method finish update output shape and output size in kernel tensor via data
106     // item from MindData, need not do infer shape first.
107     const auto &kernel_mod = kernel_info_->MutableKernelMod();
108     MS_EXCEPTION_IF_NULL(kernel_mod);
109     int ret = kernel_mod->Resize({}, output_kernel_tensors_);
110     if (ret != kernel::KRET_OK) {
111       MS_LOG_WITH_NODE(EXCEPTION, data_kernel_) << "Resize failed for kernel: " << data_kernel_->fullname_with_scope();
112     }
113   }
114 
115   // Construct device tensors.
116   std::vector<DeviceTensor *> device_tensors;
117   for (auto &device_tensor : kernel_info_->output_address_list()) {
118     MS_EXCEPTION_IF_NULL(device_tensor);
119     (void)device_tensors.emplace_back(device_tensor.get());
120   }
121 
122   buffers_.push(device_tensors);
123 }
124 
SendMemoryAllocReq(OpContext<DeviceTensor> * const context)125 void DeviceQueueDataSourceActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
126   auto &device_tensors = buffers_.back();
127   if (ActorDispatcher::is_memory_allocation_sync()) {
128     ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &device_tensors,
129                               device_contexts_[0], context, GetAID());
130     OnMemoryAllocFinish(context);
131   } else {
132     ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &device_tensors,
133                           device_contexts_[0], context, GetAID());
134   }
135 }
136 
SendMemoryFreeReq(OpContext<DeviceTensor> * const context)137 void DeviceQueueDataSourceActor::SendMemoryFreeReq(OpContext<DeviceTensor> *const context) {
138   auto &device_tensors = buffers_.front();
139   if (device_contexts_.empty()) {
140     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Empty device contexts in device data source actor.");
141   }
142   if (ActorDispatcher::is_memory_free_sync()) {
143     ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &device_tensors,
144                               device_contexts_[0], context, GetAID());
145   } else {
146     ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &device_tensors, device_contexts_[0],
147                           context, GetAID());
148   }
149 }
150 
OnMemoryAllocFinish(OpContext<DeviceTensor> * const context)151 void DeviceQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
152   MS_EXCEPTION_IF_NULL(context);
153   MS_EXCEPTION_IF_NULL(data_kernel_);
154   MS_EXCEPTION_IF_CHECK_FAIL((!device_contexts_.empty()), "The device context doesn't exist.");
155   MS_EXCEPTION_IF_NULL(device_contexts_[0]);
156   if (IsRunningFailed(context)) {
157     return;
158   }
159   if (buffers_.size() == 0) {
160     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
161   }
162 
163   // Construct outputs of data kernel launching.
164   auto &device_tensors = buffers_.back();
165   if (output_kernel_tensors_.size() != device_tensors.size()) {
166     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The outputs number is not equal to the device tensors number.");
167   }
168   for (size_t i = 0; i < device_tensors.size(); ++i) {
169     MS_EXCEPTION_IF_NULL(output_kernel_tensors_[i]);
170     MS_EXCEPTION_IF_NULL(device_tensors[i]);
171     output_kernel_tensors_[i]->set_device_ptr(device_tensors[i]->GetMutablePtr());
172     output_kernel_tensors_[i]->set_size(device_tensors[i]->GetSize());
173     if (recorder_aid_ != nullptr || debug_aid_ != nullptr) {
174       mem_info_.outputs_[i]->addr = device_tensors[i]->GetMutablePtr();
175       mem_info_.outputs_[i]->size = device_tensors[i]->GetSize();
176     }
177   }
178 
179   if (debug_aid_ != nullptr) {
180     ActorDispatcher::SendSync(*debug_aid_, &DebugActor::DebugPreLaunch, data_kernel_, std::vector<DeviceTensor *>(),
181                               device_tensors, device_contexts_[0], context, &GetAID());
182   }
183 
184   // Copy data from device queue by data kernel launching.
185   MS_EXCEPTION_IF_NULL(kernel_info_);
186   try {
187     uint64_t start_time = 0;
188     PROFILER_START(start_time);
189     auto kernel_mod = AnfAlgo::GetKernelMod(data_kernel_);
190     auto ret = device_contexts_[0]->GetKernelExecutor(false)->LaunchKernel(data_kernel_, {}, {}, output_kernel_tensors_,
191                                                                            kernel_mod, stream_);
192     PROFILER_END(start_time, ProfilerModule::kKernel, ProfilerEvent::kKernelLaunch, GetAID().Name(), false);
193     if (!ret) {
194       std::string error_info = "Launch kernel failed: " + data_kernel_->fullname_with_scope();
195       SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
196     }
197   } catch (const std::exception &e) {
198     MsException::Instance().SetException();
199     std::string error_info = "Launch kernel exception: " + data_kernel_->fullname_with_scope();
200     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), error_info);
201   }
202 
203   // Debug actor is blocked, must wait debug actor callback message to process continue.
204   if (debug_aid_ != nullptr) {
205     SendDebugReq(context);
206     return;
207   }
208 
209   PostRun(context);
210 }
211 
SendDebugReq(OpContext<DeviceTensor> * const context)212 void DeviceQueueDataSourceActor::SendDebugReq(OpContext<DeviceTensor> *const context) {
213   ActorDispatcher::SendSync(*debug_aid_, &DebugActor::DebugPostLaunch, data_kernel_, std::vector<DeviceTensor *>(),
214                             buffers_.back(), device_contexts_[0], context, &GetAID());
215   OnDebugFinish(context);
216 }
217 
SendRecorderInfo(OpContext<DeviceTensor> * const context) const218 void DeviceQueueDataSourceActor::SendRecorderInfo(OpContext<DeviceTensor> *const context) const {
219   if (recorder_aid_ != nullptr && (!device_contexts_.empty())) {
220     MS_EXCEPTION_IF_NULL(data_kernel_);
221     ActorDispatcher::Send(*recorder_aid_, &RecorderActor::RecordInfo, data_kernel_->fullname_with_scope(), &mem_info_,
222                           device_contexts_[0], context);
223   }
224 }
225 
FillDataBuffer()226 void HostQueueDataSourceActor::FillDataBuffer() {
227   // Construct device tensors.
228   std::vector<DeviceTensor *> device_tensors;
229   for (auto &node_with_index : data_node_with_indexs_) {
230     MS_LOG(DEBUG) << "Node:" << node_with_index.first->DebugString() << " index:" << node_with_index.second;
231     auto device_address = AnfAlgo::GetMutableOutputAddr(node_with_index.first, node_with_index.second, false);
232     MS_EXCEPTION_IF_NULL(device_address);
233     (void)device_tensors.emplace_back(device_address.get());
234   }
235 
236   buffers_.push(device_tensors);
237 }
238 
SendMemoryAllocReq(OpContext<DeviceTensor> * const context)239 void HostQueueDataSourceActor::SendMemoryAllocReq(OpContext<DeviceTensor> *const context) {
240   if (device_contexts_.empty()) {
241     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Empty device contexts in device data source actor.");
242   }
243   auto &device_tensors = buffers_.back();
244   if (ActorDispatcher::is_memory_allocation_sync()) {
245     if (IsSameDeviceType()) {
246       ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &device_tensors,
247                                 device_contexts_[0], context, GetAID());
248     } else {
249       ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::AllocateBatchMemory, &device_tensors,
250                                 &device_contexts_, context, GetAID());
251     }
252     OnMemoryAllocFinish(context);
253   } else {
254     if (IsSameDeviceType()) {
255       ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::AllocateMemory, &device_tensors,
256                             device_contexts_[0], context, GetAID());
257     } else {
258       ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::AllocateBatchMemory, &device_tensors,
259                             &device_contexts_, context, GetAID());
260     }
261   }
262 }
263 
SendMemoryFreeReq(OpContext<DeviceTensor> * const context)264 void HostQueueDataSourceActor::SendMemoryFreeReq(OpContext<DeviceTensor> *const context) {
265   if (device_contexts_.empty()) {
266     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Empty device contexts in device data source actor.");
267   }
268   auto &device_tensors = buffers_.front();
269   if (ActorDispatcher::is_memory_free_sync()) {
270     if (IsSameDeviceType()) {
271       ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &device_tensors,
272                                 device_contexts_[0], context, GetAID());
273     } else {
274       ActorDispatcher::SendSync(memory_manager_aid_, &MemoryManagerActor::FreeBatchMemory, &device_tensors,
275                                 &device_contexts_, context, GetAID());
276     }
277   } else {
278     if (IsSameDeviceType()) {
279       ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::FreeMemory, &device_tensors, device_contexts_[0],
280                             context, GetAID());
281     } else {
282       ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::FreeBatchMemory, &device_tensors,
283                             &device_contexts_, context, GetAID());
284     }
285   }
286 }
287 
OnMemoryAllocFinish(OpContext<DeviceTensor> * const context)288 void HostQueueDataSourceActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
289   auto ms_context = MsContext::GetInstance();
290   MS_EXCEPTION_IF_NULL(ms_context);
291   MS_EXCEPTION_IF_NULL(context);
292   if (IsRunningFailed(context)) {
293     return;
294   }
295   if (buffers_.size() == 0) {
296     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "The data queue is empty.");
297   }
298 
299   // Get host tensors from host queue and get device tensors from buffers.
300   MS_EXCEPTION_IF_NULL(host_queue_);
301   if (host_queue_->IsEmpty()) {
302     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Host data queue is empty.");
303   }
304   auto &host_tensors = host_queue_->Pull();
305   auto &device_tensors = buffers_.back();
306   if (host_tensors.size() != device_tensors.size()) {
307     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context),
308                                       "The length of host tensors is not equal to the length of device tensors.");
309   }
310 
311   // Copy data from host tensor to device tensor.
312   uint64_t start_time = 0;
313   PROFILER_START(start_time);
314   auto enable_async_copy = ms_context->IsEnableInferBoost() || IsTwoPhaseInfer();
315   try {
316     for (size_t i = 0; i < host_tensors.size(); ++i) {
317       auto &host_tensor = host_tensors[i];
318       auto &device_tensor = device_tensors[i];
319       MS_EXCEPTION_IF_NULL(device_tensor);
320       MS_EXCEPTION_IF_NULL(host_tensor);
321       // No used device address need skip.
322       if (TEST_FLAG(device_tensor->flag(), device::kDeviceAddressFlagNotUsed)) {
323         MS_LOG(DEBUG) << GetAID().Name() << " input index " << i << " is not used.";
324         continue;
325       }
326       auto tensor_device_address = std::dynamic_pointer_cast<DeviceTensor>(host_tensor->device_address());
327       // Sync data from host_tensor_device_address to device_tensor.
328       if (tensor_device_address != nullptr) {
329         if (tensor_device_address.get() == device_tensor) {
330           continue;
331         }
332         if (!Copy(device_tensor, tensor_device_address.get())) {
333           SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Copy data failed.");
334         }
335         continue;
336       }
337       if (host_tensor->data_ptr() == nullptr && device_tensor->GetSize() == 0) {
338         MS_LOG(INFO) << "Empty tuple sync";
339         continue;
340       }
341 
342       if (enable_async_copy) {
343         if (!device_tensor->AsyncHostToDevice(LongToSize(host_tensor->data().nbytes()), host_tensor->data_type(),
344                                               host_tensor->data_ptr()->data())) {
345           SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "SyncHostToDevice failed.");
346         }
347       } else {
348         if (!device_tensor->SyncHostToDevice(
349               trans::GetRuntimePaddingShape(data_node_with_indexs_[i].first, data_node_with_indexs_[i].second),
350               LongToSize(host_tensor->data().nbytes()), host_tensor->data_type(),
351               host_tensor->device_info().host_format_, host_tensor->data_ptr())) {
352           SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "SyncHostToDevice failed.");
353         }
354       }
355 
356       if (IsDynamic(device_tensor->host_shape())) {
357         device_tensor->set_host_shape(host_tensor->shape());
358       }
359     }
360   } catch (const std::exception &e) {
361     MsException::Instance().SetException();
362     SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context), "Host data source actor run exception.");
363   }
364   PROFILER_END(start_time, ProfilerModule::kRuntime, ProfilerEvent::kCopyData, GetAID().Name(), false);
365 
366   PostRun(context);
367 }
368 
FetchNodePosition(const KernelWithIndex & data_node) const369 size_t HostQueueDataSourceActor::FetchNodePosition(const KernelWithIndex &data_node) const {
370   MS_EXCEPTION_IF_NULL(data_node.first);
371   const auto &iter = data_node_position_map_.find(data_node);
372   if (iter == data_node_position_map_.end()) {
373     MS_LOG_WITH_NODE(EXCEPTION, data_node.first)
374       << "Data node: " << data_node.first->DebugString() << " index:" << data_node.second << " is not exist.";
375   }
376   return iter->second;
377 }
378 
FetchNode(size_t node_position) const379 KernelWithIndex HostQueueDataSourceActor::FetchNode(size_t node_position) const {
380   if (node_position >= data_node_with_indexs_.size()) {
381     MS_LOG(EXCEPTION) << "The position of node is out of range: " << node_position;
382   }
383   return data_node_with_indexs_[node_position];
384 }
385 
IsSameDeviceType() const386 bool HostQueueDataSourceActor::IsSameDeviceType() const {
387   for (size_t i = 1; i < device_contexts_.size(); i++) {
388     if (device_contexts_[i] != device_contexts_[0]) {
389       return false;
390     }
391   }
392   return true;
393 }
394 
ReleaseData()395 void HostQueueDataSourceActor::ReleaseData() {
396   // The step end need free the host queue tensor.
397   MS_EXCEPTION_IF_NULL(host_queue_);
398   host_queue_->Pop();
399 
400   // The step end need release data node address.
401   for (auto &data_node_with_index : data_node_with_indexs_) {
402     if (!AnfAlgo::OutputAddrExist(data_node_with_index.first, data_node_with_index.second)) {
403       continue;
404     }
405     auto old_address = AnfAlgo::GetMutableOutputAddr(data_node_with_index.first, data_node_with_index.second);
406     MS_EXCEPTION_IF_NULL(old_address);
407     if (old_address->GetPtr() == nullptr) {
408       // The Address memory is already freed.
409       continue;
410     }
411     // If the address from input tensor and the address is not used by runtime.
412     if (old_address->original_ref_count() == SIZE_MAX && !old_address->is_ptr_persisted()) {
413       auto device_context = device::DeviceContextManager::GetInstance().GetOrCreateDeviceContext(
414         {old_address->device_name(), old_address->device_id()});
415       MS_EXCEPTION_IF_NULL(device_context);
416       const auto &kernel_tensor = old_address->kernel_tensor();
417       MS_EXCEPTION_IF_NULL(kernel_tensor);
418       auto new_kernel_tensor = kernel_tensor->CloneKernelTensor();
419       MS_EXCEPTION_IF_NULL(new_kernel_tensor);
420       new_kernel_tensor->set_device_ptr(nullptr);
421 
422       auto new_address = device_context->device_res_manager_->CreateDeviceAddress(new_kernel_tensor);
423       MS_EXCEPTION_IF_NULL(new_address);
424       MS_LOG(DEBUG) << "Create device tensor:" << new_address << " type:" << new_address->type_id()
425                     << ", kernel tensor addr:" << new_kernel_tensor.get();
426       new_address->set_original_ref_count(old_address->original_ref_count());
427       new_address->ResetRefCount();
428       new_address->set_flag(old_address->flag());
429       auto [node, index] = old_address->GetNodeIndex();
430       new_address->SetNodeIndex(node, index);
431       AnfAlgo::SetOutputAddr(new_address, data_node_with_index.second, data_node_with_index.first.get());
432     }
433   }
434 }
435 }  // namespace runtime
436 }  // namespace mindspore
437