1 /**
2 * Copyright 2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "plugin/device/ascend/hal/device/mbuf_receive_manager.h"
17 #include <cstddef>
18 #include <ctime>
19 #include <fstream>
20 #include <ios>
21 #include <memory>
22 #include <string>
23 #include <thread>
24 #include "include/common/utils/utils.h"
25 #include "ir/tensor.h"
26 #include "plugin/device/ascend/hal/device/ascend_data_queue.h"
27 #include "utils/file_utils.h"
28 #include "utils/ms_context.h"
29 #include "utils/shape_utils.h"
30 #include "transform/symbol/acl_tdt_symbol.h"
31 #include "transform/symbol/symbol_utils.h"
32
33 namespace mindspore::device::ascend {
34
35 namespace {
36 // Data may not be received when the process exits; reserve a timeout period.
37 constexpr std::chrono::milliseconds stop_time_out{100};
38
CopyDataToTensor(const uint8_t * src_addr,mindspore::tensor::TensorPtr tensor_ptr,const size_t size)39 bool CopyDataToTensor(const uint8_t *src_addr, mindspore::tensor::TensorPtr tensor_ptr, const size_t size) {
40 MS_EXCEPTION_IF_NULL(src_addr);
41 MS_EXCEPTION_IF_NULL(tensor_ptr);
42 auto *dst_addr = reinterpret_cast<uint8_t *>(tensor_ptr->data_c());
43 MS_EXCEPTION_IF_NULL(dst_addr);
44 size_t dst_size = static_cast<size_t>(tensor_ptr->data().nbytes());
45 MS_EXCEPTION_IF_CHECK_FAIL(dst_size >= size, "The destination size is smaller than the source size.");
46 size_t remain_size = size;
47 while (remain_size > SECUREC_MEM_MAX_LEN) {
48 auto cp_ret = memcpy_s(dst_addr, SECUREC_MEM_MAX_LEN, src_addr, SECUREC_MEM_MAX_LEN);
49 if (cp_ret != EOK) {
50 MS_LOG(ERROR) << "Failed to copy the memory to py::tensor " << cp_ret;
51 return false;
52 }
53 remain_size -= SECUREC_MEM_MAX_LEN;
54 dst_addr += SECUREC_MEM_MAX_LEN;
55 src_addr += SECUREC_MEM_MAX_LEN;
56 }
57 if (remain_size != 0U) {
58 auto cp_ret = memcpy_s(dst_addr, remain_size, src_addr, remain_size);
59 if (cp_ret != EOK) {
60 MS_LOG(ERROR) << "Failed to copy the memory to py::tensor " << cp_ret;
61 return false;
62 }
63 }
64
65 return true;
66 }
67
ConvertDataItemToTensorPtr(acltdtDataItem * item)68 mindspore::tensor::TensorPtr ConvertDataItemToTensorPtr(acltdtDataItem *item) {
69 size_t dim_num = CALL_ASCEND_API(acltdtGetDimNumFromItem, item);
70 void *acl_addr = CALL_ASCEND_API(acltdtGetDataAddrFromItem, item);
71 size_t acl_data_size = CALL_ASCEND_API(acltdtGetDataSizeFromItem, item);
72 aclDataType acl_data_type = CALL_ASCEND_API(acltdtGetDataTypeFromItem, item);
73
74 auto acl_data = reinterpret_cast<uint8_t *>(acl_addr);
75 if (acl_data_size > 0) {
76 MS_EXCEPTION_IF_NULL(acl_data);
77 }
78
79 ShapeVector tensor_shape;
80 tensor_shape.resize(dim_num);
81
82 if (CALL_ASCEND_API(acltdtGetDimsFromItem, item, tensor_shape.data(), dim_num) != ACL_SUCCESS) {
83 MS_LOG(ERROR) << "ACL failed to get dim-size from acl channel data";
84 return nullptr;
85 }
86
87 auto type_iter = kAclDataTypeMap.find(acl_data_type);
88 if (type_iter == kAclDataTypeMap.end()) {
89 MS_LOG(ERROR) << "The type of aclData not support: " << acl_data_type;
90 return nullptr;
91 }
92 auto type_id = type_iter->second;
93 auto tensor_ptr = std::make_shared<mindspore::tensor::Tensor>(type_id, tensor_shape);
94 if (acl_data_size == 0) {
95 return tensor_ptr;
96 }
97 if (CopyDataToTensor(acl_data, tensor_ptr, acl_data_size)) {
98 return tensor_ptr;
99 }
100 return nullptr;
101 }
102 } // namespace
103
ProcessFullTensor(acltdtDataItem * item)104 bool ScopeAclTdtDataset::ProcessFullTensor(acltdtDataItem *item) {
105 aclDataType acl_data_type = CALL_ASCEND_API(acltdtGetDataTypeFromItem, item);
106 if (acl_data_type == ACL_STRING) {
107 void *acl_addr = CALL_ASCEND_API(acltdtGetDataAddrFromItem, item);
108 size_t acl_data_size = CALL_ASCEND_API(acltdtGetDataSizeFromItem, item);
109 data_items_.emplace_back(std::string(static_cast<char *>(acl_addr), acl_data_size));
110 return true;
111 }
112
113 auto tensor_ptr = ConvertDataItemToTensorPtr(item);
114 if (tensor_ptr == nullptr) {
115 return false;
116 }
117 data_items_.emplace_back(tensor_ptr);
118 return true;
119 }
120
ProcessSliceTensor(acltdtDataItem * item)121 bool ScopeAclTdtDataset::ProcessSliceTensor(acltdtDataItem *item) {
122 size_t slice_num, slice_id;
123 auto ret = CALL_ASCEND_API(acltdtGetSliceInfoFromItem, item, &slice_num, &slice_id);
124 if (ret != ACL_SUCCESS) {
125 MS_LOG(WARNING) << "Get slice info failed with error code " << ret;
126 return false;
127 }
128 MS_LOG(DEBUG) << "Process slice tensor, slice_num=" << slice_num << ", slice_id=" << slice_id;
129
130 // slice_num is 0, that means the tensor is not sliced
131 if (slice_num == 0) {
132 if (sliced_tensor_ != nullptr) {
133 MS_LOG(WARNING) << "Expect slice id " << sliced_tensor_->slice_id_ << ", but got slice id " << slice_id;
134 return false;
135 }
136 return ProcessFullTensor(item);
137 }
138
139 // current data item is just a slice of tensor
140 if (sliced_tensor_ == nullptr) {
141 size_t dim_num = CALL_ASCEND_API(acltdtGetDimNumFromItem, item);
142 aclDataType acl_data_type = CALL_ASCEND_API(acltdtGetDataTypeFromItem, item);
143
144 ShapeVector tensor_shape;
145 tensor_shape.resize(dim_num);
146
147 if (CALL_ASCEND_API(acltdtGetDimsFromItem, item, tensor_shape.data(), dim_num) != ACL_SUCCESS) {
148 MS_LOG(WARNING) << "ACL failed to get dim-size from acl channel data";
149 return false;
150 }
151 sliced_tensor_ = std::make_shared<SlicedTensor>(slice_num, acl_data_type, tensor_shape);
152 }
153 if (slice_id != sliced_tensor_->slice_id_) {
154 MS_LOG(WARNING) << "Expect slice id " << sliced_tensor_->slice_id_ << ", but got slice id " << slice_id;
155 return false;
156 }
157
158 void *acl_addr = CALL_ASCEND_API(acltdtGetDataAddrFromItem, item);
159 size_t acl_data_size = CALL_ASCEND_API(acltdtGetDataSizeFromItem, item);
160 if (acl_data_size > 0) {
161 sliced_tensor_->buffer_ << std::string(static_cast<char *>(acl_addr), acl_data_size);
162 }
163 sliced_tensor_->slice_id_ += 1;
164 // when received last piece of tensor
165 if (sliced_tensor_->slice_id_ == sliced_tensor_->slice_num_) {
166 return FinishSliceTensor();
167 }
168 return true;
169 }
170
FinishSliceTensor()171 bool ScopeAclTdtDataset::FinishSliceTensor() {
172 aclDataType acl_data_type = sliced_tensor_->data_type_;
173 std::string tensor_data = sliced_tensor_->buffer_.str();
174 if (acl_data_type == ACL_STRING) {
175 data_items_.emplace_back(tensor_data);
176 } else {
177 auto type_iter = kAclDataTypeMap.find(acl_data_type);
178 if (type_iter == kAclDataTypeMap.end()) {
179 MS_LOG(WARNING) << "The type of aclData not support: " << acl_data_type;
180 return false;
181 }
182 auto type_id = type_iter->second;
183 auto tensor_ptr = std::make_shared<mindspore::tensor::Tensor>(type_id, sliced_tensor_->tensor_shape_);
184 if (!CopyDataToTensor(reinterpret_cast<const uint8_t *>(tensor_data.c_str()), tensor_ptr, tensor_data.size())) {
185 return false;
186 }
187 data_items_.emplace_back(tensor_ptr);
188 }
189 sliced_tensor_ = nullptr;
190 return true;
191 }
192
ProcessDataset(acltdtDataset * acl_dataset)193 bool ScopeAclTdtDataset::ProcessDataset(acltdtDataset *acl_dataset) {
194 bool is_end_output = (tensor_type_ != ACL_TENSOR_DATA_SLICE_TENSOR);
195 bool error_flag = false;
196
197 // NOTE: ONLY the FIRST dataset containing the dataset name
198 // May be the acltdtDataset is empty but has name
199 if (tensor_type_ == ACL_TENSOR_DATA_UNDEFINED && dataset_name_.empty()) {
200 dataset_name_ = CALL_ASCEND_API(acltdtGetDatasetName, acl_dataset);
201 }
202
203 size_t acl_dataset_size = CALL_ASCEND_API(acltdtGetDatasetSize, acl_dataset);
204 MS_LOG(DEBUG) << "Receive one dataset with size " << acl_dataset_size << ", tensor_type_=" << tensor_type_
205 << ", sliced_tensor_=" << sliced_tensor_.get();
206
207 for (size_t i = 0; i < acl_dataset_size; i++) {
208 acltdtDataItem *item = CALL_ASCEND_API(acltdtGetDataItem, acl_dataset, i);
209 MS_EXCEPTION_IF_NULL(item);
210
211 auto type = CALL_ASCEND_API(acltdtGetTensorTypeFromItem, item);
212 MS_LOG(DEBUG) << "Process data item " << i << "/" << acl_dataset_size << ", type is " << type << ", data type is "
213 << CALL_ASCEND_API(acltdtGetDataTypeFromItem, item) << ", data length is "
214 << CALL_ASCEND_API(acltdtGetDataSizeFromItem, item);
215
216 if (type == ACL_TENSOR_DATA_END_OF_SEQUENCE) {
217 MS_LOG(INFO) << "Encounter end of sequence";
218 break;
219 }
220 if (type != ACL_TENSOR_DATA_TENSOR && type != ACL_TENSOR_DATA_SLICE_TENSOR && type != ACL_TENSOR_DATA_END_TENSOR) {
221 MS_LOG(WARNING) << "Encounter invalid data item of type " << type << ", ignore it.";
222 continue;
223 }
224 if (!CheckAndSetTensorType(type)) {
225 error_flag = true;
226 break;
227 }
228
229 if (type == ACL_TENSOR_DATA_TENSOR) {
230 if (!ProcessFullTensor(item)) {
231 error_flag = true;
232 break;
233 }
234 continue;
235 }
236
237 // dataitem is a slice tensor, i.e. type of which is ACL_TENSOR_DATA_SLICE_TENSOR or ACL_TENSOR_DATA_END_TENSOR
238 is_end_output = false;
239 if (!ProcessSliceTensor(item)) {
240 error_flag = true;
241 break;
242 }
243
244 if (type == ACL_TENSOR_DATA_END_TENSOR) {
245 // reach the end of current output
246 is_end_output = true;
247 break;
248 }
249 }
250
251 if (error_flag) {
252 // when encounter error, drop out all processed data
253 is_end_output = false;
254 Reset();
255 }
256
257 MS_LOG(DEBUG) << "Return with is_end_output=" << std::boolalpha << is_end_output;
258
259 return is_end_output;
260 }
261
CheckAndSetTensorType(acltdtTensorType tensor_type)262 bool ScopeAclTdtDataset::CheckAndSetTensorType(acltdtTensorType tensor_type) {
263 switch (tensor_type) {
264 case ACL_TENSOR_DATA_TENSOR: {
265 if (tensor_type_ == ACL_TENSOR_DATA_UNDEFINED) {
266 tensor_type_ = ACL_TENSOR_DATA_TENSOR;
267 } else if (tensor_type_ != ACL_TENSOR_DATA_TENSOR) {
268 MS_LOG(WARNING) << "Encounter mismatched tensor type, expect " << tensor_type_ << " but got " << tensor_type;
269 return false;
270 }
271 break;
272 }
273 case ACL_TENSOR_DATA_SLICE_TENSOR:
274 case ACL_TENSOR_DATA_END_TENSOR: {
275 if (tensor_type_ == ACL_TENSOR_DATA_UNDEFINED) {
276 tensor_type_ = ACL_TENSOR_DATA_SLICE_TENSOR;
277 } else if (tensor_type_ != ACL_TENSOR_DATA_SLICE_TENSOR) {
278 MS_LOG(WARNING) << "Encounter mismatched tensor type, expect " << tensor_type_ << " but got " << tensor_type;
279 return false;
280 }
281 break;
282 }
283 default:
284 MS_LOG(WARNING) << "Encounter invalid tensor type " << tensor_type << ", ignore it.";
285 return false;
286 }
287 return true;
288 }
289
MbufDataHandler(MbufFuncType func,uint32_t device_id,string channel_name,string prim_name,size_t capacity,int32_t timeout)290 MbufDataHandler::MbufDataHandler(MbufFuncType func, uint32_t device_id, string channel_name, string prim_name,
291 size_t capacity, int32_t timeout)
292 : func_(func),
293 device_id_(device_id),
294 channel_name_(channel_name),
295 prim_name_(prim_name),
296 capacity_(capacity),
297 timeout_(timeout) {
298 MS_LOG(INFO) << "Channel " << channel_name_ << " begins the construction process.";
299 acl_handle_ = CALL_ASCEND_API(acltdtCreateChannelWithCapacity, device_id_, channel_name_.c_str(), capacity_);
300 if (acl_handle_ == nullptr) {
301 string warning_info = "The creation of " + channel_name_ + " channel failed";
302 if (!prim_name_.empty()) {
303 warning_info += " and the corresponding " + prim_name_ + " Primitive cannot be used in GRAPH_MODE";
304 }
305 MS_LOG(WARNING) << warning_info;
306 return;
307 }
308 thread_ = std::thread(&MbufDataHandler::HandleData, this);
309 }
310
~MbufDataHandler()311 MbufDataHandler::~MbufDataHandler() {
312 MS_LOG(INFO) << "Channel " << channel_name_ << " begins the destruction process.";
313 // Stop the child thread from receiving data
314 stop_receive_.store(true, std::memory_order_acq_rel);
315 if (thread_.joinable()) {
316 thread_.join();
317 }
318 if (acl_handle_) {
319 aclError status = CALL_ASCEND_API(acltdtDestroyChannel, acl_handle_);
320 acl_handle_ = nullptr;
321 if (status != ACL_SUCCESS) {
322 MS_LOG(ERROR) << "Channel " << channel_name_ << " failed destroy acl channel. Error code: " << status;
323 return;
324 }
325 } else {
326 MS_LOG(INFO) << "Channel " << channel_name_ << ": acl handle has been destroyed.";
327 }
328 }
329
ReceiveAndProcessData(ScopeAclTdtDataset * dataset)330 bool MbufDataHandler::ReceiveAndProcessData(ScopeAclTdtDataset *dataset) {
331 aclError status = CALL_ASCEND_API(acltdtReceiveTensor, acl_handle_, dataset->Get(), timeout_);
332 if (status != ACL_SUCCESS) {
333 if (status == ACL_ERROR_RT_QUEUE_EMPTY) {
334 return true;
335 }
336 MS_LOG(ERROR) << "Channel " << channel_name_ << " failed to receive tensor. Error code is " << status;
337 return false;
338 }
339
340 if (dataset->ProcessDataset(dataset->Get())) {
341 func_(*dataset);
342 dataset->Reset();
343 }
344
345 return true;
346 }
347
QueryChannelSize(size_t * size)348 bool MbufDataHandler::QueryChannelSize(size_t *size) {
349 aclError status = CALL_ASCEND_API(acltdtQueryChannelSize, acl_handle_, size);
350 if (status != ACL_SUCCESS) {
351 MS_LOG(ERROR) << "Channel " << channel_name_ << " failed to QueryChannelSize. Error code is " << status;
352 return false;
353 }
354 return true;
355 }
356
HandleData()357 void MbufDataHandler::HandleData() {
358 MS_LOG(INFO) << "Channel " << channel_name_ << " starts executing HandleData.";
359 ScopeAclTdtDataset scope_acl_dataset;
360 if (scope_acl_dataset.Get() == nullptr) {
361 MS_LOG(ERROR) << "Channel " << channel_name_ << " failed to create aclDateaset.";
362 return;
363 }
364
365 while (!stop_receive_.load()) {
366 if (!ReceiveAndProcessData(&scope_acl_dataset)) {
367 return;
368 }
369 }
370
371 size_t channel_size = 0;
372 if (!QueryChannelSize(&channel_size)) {
373 return;
374 }
375 while (channel_size > 0) {
376 if (!ReceiveAndProcessData(&scope_acl_dataset)) {
377 return;
378 }
379 if (!QueryChannelSize(&channel_size)) {
380 return;
381 }
382 }
383 if (channel_size > 0) {
384 MS_LOG(ERROR) << "Channel " << channel_name_ << " has stopped receiving data, and " << channel_size
385 << " pieces of data have not been received.";
386 return;
387 }
388 }
389 } // namespace mindspore::device::ascend
390