• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "plugin/device/ascend/hal/device/mbuf_receive_manager.h"
17 #include <cstddef>
18 #include <ctime>
19 #include <fstream>
20 #include <ios>
21 #include <memory>
22 #include <string>
23 #include <thread>
24 #include "include/common/utils/utils.h"
25 #include "ir/tensor.h"
26 #include "plugin/device/ascend/hal/device/ascend_data_queue.h"
27 #include "utils/file_utils.h"
28 #include "utils/ms_context.h"
29 #include "utils/shape_utils.h"
30 #include "transform/symbol/acl_tdt_symbol.h"
31 #include "transform/symbol/symbol_utils.h"
32 
33 namespace mindspore::device::ascend {
34 
35 namespace {
36 // Data may not be received when the process exits; reserve a timeout period.
37 constexpr std::chrono::milliseconds stop_time_out{100};
38 
CopyDataToTensor(const uint8_t * src_addr,mindspore::tensor::TensorPtr tensor_ptr,const size_t size)39 bool CopyDataToTensor(const uint8_t *src_addr, mindspore::tensor::TensorPtr tensor_ptr, const size_t size) {
40   MS_EXCEPTION_IF_NULL(src_addr);
41   MS_EXCEPTION_IF_NULL(tensor_ptr);
42   auto *dst_addr = reinterpret_cast<uint8_t *>(tensor_ptr->data_c());
43   MS_EXCEPTION_IF_NULL(dst_addr);
44   size_t dst_size = static_cast<size_t>(tensor_ptr->data().nbytes());
45   MS_EXCEPTION_IF_CHECK_FAIL(dst_size >= size, "The destination size is smaller than the source size.");
46   size_t remain_size = size;
47   while (remain_size > SECUREC_MEM_MAX_LEN) {
48     auto cp_ret = memcpy_s(dst_addr, SECUREC_MEM_MAX_LEN, src_addr, SECUREC_MEM_MAX_LEN);
49     if (cp_ret != EOK) {
50       MS_LOG(ERROR) << "Failed to copy the memory to py::tensor " << cp_ret;
51       return false;
52     }
53     remain_size -= SECUREC_MEM_MAX_LEN;
54     dst_addr += SECUREC_MEM_MAX_LEN;
55     src_addr += SECUREC_MEM_MAX_LEN;
56   }
57   if (remain_size != 0U) {
58     auto cp_ret = memcpy_s(dst_addr, remain_size, src_addr, remain_size);
59     if (cp_ret != EOK) {
60       MS_LOG(ERROR) << "Failed to copy the memory to py::tensor " << cp_ret;
61       return false;
62     }
63   }
64 
65   return true;
66 }
67 
ConvertDataItemToTensorPtr(acltdtDataItem * item)68 mindspore::tensor::TensorPtr ConvertDataItemToTensorPtr(acltdtDataItem *item) {
69   size_t dim_num = CALL_ASCEND_API(acltdtGetDimNumFromItem, item);
70   void *acl_addr = CALL_ASCEND_API(acltdtGetDataAddrFromItem, item);
71   size_t acl_data_size = CALL_ASCEND_API(acltdtGetDataSizeFromItem, item);
72   aclDataType acl_data_type = CALL_ASCEND_API(acltdtGetDataTypeFromItem, item);
73 
74   auto acl_data = reinterpret_cast<uint8_t *>(acl_addr);
75   if (acl_data_size > 0) {
76     MS_EXCEPTION_IF_NULL(acl_data);
77   }
78 
79   ShapeVector tensor_shape;
80   tensor_shape.resize(dim_num);
81 
82   if (CALL_ASCEND_API(acltdtGetDimsFromItem, item, tensor_shape.data(), dim_num) != ACL_SUCCESS) {
83     MS_LOG(ERROR) << "ACL failed to get dim-size from acl channel data";
84     return nullptr;
85   }
86 
87   auto type_iter = kAclDataTypeMap.find(acl_data_type);
88   if (type_iter == kAclDataTypeMap.end()) {
89     MS_LOG(ERROR) << "The type of aclData not support: " << acl_data_type;
90     return nullptr;
91   }
92   auto type_id = type_iter->second;
93   auto tensor_ptr = std::make_shared<mindspore::tensor::Tensor>(type_id, tensor_shape);
94   if (acl_data_size == 0) {
95     return tensor_ptr;
96   }
97   if (CopyDataToTensor(acl_data, tensor_ptr, acl_data_size)) {
98     return tensor_ptr;
99   }
100   return nullptr;
101 }
102 }  // namespace
103 
ProcessFullTensor(acltdtDataItem * item)104 bool ScopeAclTdtDataset::ProcessFullTensor(acltdtDataItem *item) {
105   aclDataType acl_data_type = CALL_ASCEND_API(acltdtGetDataTypeFromItem, item);
106   if (acl_data_type == ACL_STRING) {
107     void *acl_addr = CALL_ASCEND_API(acltdtGetDataAddrFromItem, item);
108     size_t acl_data_size = CALL_ASCEND_API(acltdtGetDataSizeFromItem, item);
109     data_items_.emplace_back(std::string(static_cast<char *>(acl_addr), acl_data_size));
110     return true;
111   }
112 
113   auto tensor_ptr = ConvertDataItemToTensorPtr(item);
114   if (tensor_ptr == nullptr) {
115     return false;
116   }
117   data_items_.emplace_back(tensor_ptr);
118   return true;
119 }
120 
ProcessSliceTensor(acltdtDataItem * item)121 bool ScopeAclTdtDataset::ProcessSliceTensor(acltdtDataItem *item) {
122   size_t slice_num, slice_id;
123   auto ret = CALL_ASCEND_API(acltdtGetSliceInfoFromItem, item, &slice_num, &slice_id);
124   if (ret != ACL_SUCCESS) {
125     MS_LOG(WARNING) << "Get slice info failed with error code " << ret;
126     return false;
127   }
128   MS_LOG(DEBUG) << "Process slice tensor, slice_num=" << slice_num << ", slice_id=" << slice_id;
129 
130   // slice_num is 0, that means the tensor is not sliced
131   if (slice_num == 0) {
132     if (sliced_tensor_ != nullptr) {
133       MS_LOG(WARNING) << "Expect slice id " << sliced_tensor_->slice_id_ << ", but got slice id " << slice_id;
134       return false;
135     }
136     return ProcessFullTensor(item);
137   }
138 
139   // current data item is just a slice of tensor
140   if (sliced_tensor_ == nullptr) {
141     size_t dim_num = CALL_ASCEND_API(acltdtGetDimNumFromItem, item);
142     aclDataType acl_data_type = CALL_ASCEND_API(acltdtGetDataTypeFromItem, item);
143 
144     ShapeVector tensor_shape;
145     tensor_shape.resize(dim_num);
146 
147     if (CALL_ASCEND_API(acltdtGetDimsFromItem, item, tensor_shape.data(), dim_num) != ACL_SUCCESS) {
148       MS_LOG(WARNING) << "ACL failed to get dim-size from acl channel data";
149       return false;
150     }
151     sliced_tensor_ = std::make_shared<SlicedTensor>(slice_num, acl_data_type, tensor_shape);
152   }
153   if (slice_id != sliced_tensor_->slice_id_) {
154     MS_LOG(WARNING) << "Expect slice id " << sliced_tensor_->slice_id_ << ", but got slice id " << slice_id;
155     return false;
156   }
157 
158   void *acl_addr = CALL_ASCEND_API(acltdtGetDataAddrFromItem, item);
159   size_t acl_data_size = CALL_ASCEND_API(acltdtGetDataSizeFromItem, item);
160   if (acl_data_size > 0) {
161     sliced_tensor_->buffer_ << std::string(static_cast<char *>(acl_addr), acl_data_size);
162   }
163   sliced_tensor_->slice_id_ += 1;
164   // when received last piece of tensor
165   if (sliced_tensor_->slice_id_ == sliced_tensor_->slice_num_) {
166     return FinishSliceTensor();
167   }
168   return true;
169 }
170 
FinishSliceTensor()171 bool ScopeAclTdtDataset::FinishSliceTensor() {
172   aclDataType acl_data_type = sliced_tensor_->data_type_;
173   std::string tensor_data = sliced_tensor_->buffer_.str();
174   if (acl_data_type == ACL_STRING) {
175     data_items_.emplace_back(tensor_data);
176   } else {
177     auto type_iter = kAclDataTypeMap.find(acl_data_type);
178     if (type_iter == kAclDataTypeMap.end()) {
179       MS_LOG(WARNING) << "The type of aclData not support: " << acl_data_type;
180       return false;
181     }
182     auto type_id = type_iter->second;
183     auto tensor_ptr = std::make_shared<mindspore::tensor::Tensor>(type_id, sliced_tensor_->tensor_shape_);
184     if (!CopyDataToTensor(reinterpret_cast<const uint8_t *>(tensor_data.c_str()), tensor_ptr, tensor_data.size())) {
185       return false;
186     }
187     data_items_.emplace_back(tensor_ptr);
188   }
189   sliced_tensor_ = nullptr;
190   return true;
191 }
192 
ProcessDataset(acltdtDataset * acl_dataset)193 bool ScopeAclTdtDataset::ProcessDataset(acltdtDataset *acl_dataset) {
194   bool is_end_output = (tensor_type_ != ACL_TENSOR_DATA_SLICE_TENSOR);
195   bool error_flag = false;
196 
197   // NOTE: ONLY the FIRST dataset containing the dataset name
198   // May be the acltdtDataset is empty but has name
199   if (tensor_type_ == ACL_TENSOR_DATA_UNDEFINED && dataset_name_.empty()) {
200     dataset_name_ = CALL_ASCEND_API(acltdtGetDatasetName, acl_dataset);
201   }
202 
203   size_t acl_dataset_size = CALL_ASCEND_API(acltdtGetDatasetSize, acl_dataset);
204   MS_LOG(DEBUG) << "Receive one dataset with size " << acl_dataset_size << ", tensor_type_=" << tensor_type_
205                 << ", sliced_tensor_=" << sliced_tensor_.get();
206 
207   for (size_t i = 0; i < acl_dataset_size; i++) {
208     acltdtDataItem *item = CALL_ASCEND_API(acltdtGetDataItem, acl_dataset, i);
209     MS_EXCEPTION_IF_NULL(item);
210 
211     auto type = CALL_ASCEND_API(acltdtGetTensorTypeFromItem, item);
212     MS_LOG(DEBUG) << "Process data item " << i << "/" << acl_dataset_size << ", type is " << type << ", data type is "
213                   << CALL_ASCEND_API(acltdtGetDataTypeFromItem, item) << ", data length is "
214                   << CALL_ASCEND_API(acltdtGetDataSizeFromItem, item);
215 
216     if (type == ACL_TENSOR_DATA_END_OF_SEQUENCE) {
217       MS_LOG(INFO) << "Encounter end of sequence";
218       break;
219     }
220     if (type != ACL_TENSOR_DATA_TENSOR && type != ACL_TENSOR_DATA_SLICE_TENSOR && type != ACL_TENSOR_DATA_END_TENSOR) {
221       MS_LOG(WARNING) << "Encounter invalid data item of type " << type << ", ignore it.";
222       continue;
223     }
224     if (!CheckAndSetTensorType(type)) {
225       error_flag = true;
226       break;
227     }
228 
229     if (type == ACL_TENSOR_DATA_TENSOR) {
230       if (!ProcessFullTensor(item)) {
231         error_flag = true;
232         break;
233       }
234       continue;
235     }
236 
237     // dataitem is a slice tensor, i.e. type of which is ACL_TENSOR_DATA_SLICE_TENSOR or ACL_TENSOR_DATA_END_TENSOR
238     is_end_output = false;
239     if (!ProcessSliceTensor(item)) {
240       error_flag = true;
241       break;
242     }
243 
244     if (type == ACL_TENSOR_DATA_END_TENSOR) {
245       // reach the end of current output
246       is_end_output = true;
247       break;
248     }
249   }
250 
251   if (error_flag) {
252     // when encounter error, drop out all processed data
253     is_end_output = false;
254     Reset();
255   }
256 
257   MS_LOG(DEBUG) << "Return with is_end_output=" << std::boolalpha << is_end_output;
258 
259   return is_end_output;
260 }
261 
CheckAndSetTensorType(acltdtTensorType tensor_type)262 bool ScopeAclTdtDataset::CheckAndSetTensorType(acltdtTensorType tensor_type) {
263   switch (tensor_type) {
264     case ACL_TENSOR_DATA_TENSOR: {
265       if (tensor_type_ == ACL_TENSOR_DATA_UNDEFINED) {
266         tensor_type_ = ACL_TENSOR_DATA_TENSOR;
267       } else if (tensor_type_ != ACL_TENSOR_DATA_TENSOR) {
268         MS_LOG(WARNING) << "Encounter mismatched tensor type, expect " << tensor_type_ << " but got " << tensor_type;
269         return false;
270       }
271       break;
272     }
273     case ACL_TENSOR_DATA_SLICE_TENSOR:
274     case ACL_TENSOR_DATA_END_TENSOR: {
275       if (tensor_type_ == ACL_TENSOR_DATA_UNDEFINED) {
276         tensor_type_ = ACL_TENSOR_DATA_SLICE_TENSOR;
277       } else if (tensor_type_ != ACL_TENSOR_DATA_SLICE_TENSOR) {
278         MS_LOG(WARNING) << "Encounter mismatched tensor type, expect " << tensor_type_ << " but got " << tensor_type;
279         return false;
280       }
281       break;
282     }
283     default:
284       MS_LOG(WARNING) << "Encounter invalid tensor type " << tensor_type << ", ignore it.";
285       return false;
286   }
287   return true;
288 }
289 
MbufDataHandler(MbufFuncType func,uint32_t device_id,string channel_name,string prim_name,size_t capacity,int32_t timeout)290 MbufDataHandler::MbufDataHandler(MbufFuncType func, uint32_t device_id, string channel_name, string prim_name,
291                                  size_t capacity, int32_t timeout)
292     : func_(func),
293       device_id_(device_id),
294       channel_name_(channel_name),
295       prim_name_(prim_name),
296       capacity_(capacity),
297       timeout_(timeout) {
298   MS_LOG(INFO) << "Channel " << channel_name_ << " begins the construction process.";
299   acl_handle_ = CALL_ASCEND_API(acltdtCreateChannelWithCapacity, device_id_, channel_name_.c_str(), capacity_);
300   if (acl_handle_ == nullptr) {
301     string warning_info = "The creation of " + channel_name_ + " channel failed";
302     if (!prim_name_.empty()) {
303       warning_info += " and the corresponding " + prim_name_ + " Primitive cannot be used in GRAPH_MODE";
304     }
305     MS_LOG(WARNING) << warning_info;
306     return;
307   }
308   thread_ = std::thread(&MbufDataHandler::HandleData, this);
309 }
310 
~MbufDataHandler()311 MbufDataHandler::~MbufDataHandler() {
312   MS_LOG(INFO) << "Channel " << channel_name_ << " begins the destruction process.";
313   // Stop the child thread from receiving data
314   stop_receive_.store(true, std::memory_order_acq_rel);
315   if (thread_.joinable()) {
316     thread_.join();
317   }
318   if (acl_handle_) {
319     aclError status = CALL_ASCEND_API(acltdtDestroyChannel, acl_handle_);
320     acl_handle_ = nullptr;
321     if (status != ACL_SUCCESS) {
322       MS_LOG(ERROR) << "Channel " << channel_name_ << " failed destroy acl channel. Error code: " << status;
323       return;
324     }
325   } else {
326     MS_LOG(INFO) << "Channel " << channel_name_ << ": acl handle has been destroyed.";
327   }
328 }
329 
ReceiveAndProcessData(ScopeAclTdtDataset * dataset)330 bool MbufDataHandler::ReceiveAndProcessData(ScopeAclTdtDataset *dataset) {
331   aclError status = CALL_ASCEND_API(acltdtReceiveTensor, acl_handle_, dataset->Get(), timeout_);
332   if (status != ACL_SUCCESS) {
333     if (status == ACL_ERROR_RT_QUEUE_EMPTY) {
334       return true;
335     }
336     MS_LOG(ERROR) << "Channel " << channel_name_ << " failed to receive tensor. Error code is " << status;
337     return false;
338   }
339 
340   if (dataset->ProcessDataset(dataset->Get())) {
341     func_(*dataset);
342     dataset->Reset();
343   }
344 
345   return true;
346 }
347 
QueryChannelSize(size_t * size)348 bool MbufDataHandler::QueryChannelSize(size_t *size) {
349   aclError status = CALL_ASCEND_API(acltdtQueryChannelSize, acl_handle_, size);
350   if (status != ACL_SUCCESS) {
351     MS_LOG(ERROR) << "Channel " << channel_name_ << " failed to QueryChannelSize. Error code is " << status;
352     return false;
353   }
354   return true;
355 }
356 
HandleData()357 void MbufDataHandler::HandleData() {
358   MS_LOG(INFO) << "Channel " << channel_name_ << " starts executing HandleData.";
359   ScopeAclTdtDataset scope_acl_dataset;
360   if (scope_acl_dataset.Get() == nullptr) {
361     MS_LOG(ERROR) << "Channel " << channel_name_ << " failed to create aclDateaset.";
362     return;
363   }
364 
365   while (!stop_receive_.load()) {
366     if (!ReceiveAndProcessData(&scope_acl_dataset)) {
367       return;
368     }
369   }
370 
371   size_t channel_size = 0;
372   if (!QueryChannelSize(&channel_size)) {
373     return;
374   }
375   while (channel_size > 0) {
376     if (!ReceiveAndProcessData(&scope_acl_dataset)) {
377       return;
378     }
379     if (!QueryChannelSize(&channel_size)) {
380       return;
381     }
382   }
383   if (channel_size > 0) {
384     MS_LOG(ERROR) << "Channel " << channel_name_ << " has stopped receiving data, and " << channel_size
385                   << " pieces of data have not been received.";
386     return;
387   }
388 }
389 }  // namespace mindspore::device::ascend
390