• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "plugin/device/ascend/hal/device/ascend_data_queue.h"
18 #include <string>
19 #include <map>
20 #include <utility>
21 #include "graph/types.h"
22 #include "include/backend/data_queue/data_queue_mgr.h"
23 #include "include/common/utils/python_adapter.h"
24 #include "utils/log_adapter.h"
25 #include "ops/structure_op_name.h"
26 #include "plugin/device/ascend/hal/common/ascend_utils.h"
27 #include "runtime/device/kernel_runtime.h"
28 #include "runtime/device/kernel_runtime_manager.h"
29 #include "include/backend/distributed/ps/ps_cache/ps_data_prefetch.h"
30 #include "include/backend/distributed/embedding_cache/embedding_cache_utils.h"
31 #include "transform/symbol/acl_rt_symbol.h"
32 #include "transform/symbol/acl_tdt_symbol.h"
33 #include "transform/symbol/symbol_utils.h"
34 
35 namespace mindspore {
36 namespace device {
37 namespace {
38 const std::map<aclDataType, std::string> kAclTypeToString = {
39   {ACL_INT8, "int8"},       {ACL_UINT8, "uint8"},   {ACL_INT16, "int16"},    {ACL_UINT16, "uint16"},
40   {ACL_INT32, "int32"},     {ACL_UINT32, "uint32"}, {ACL_INT64, "int64"},    {ACL_UINT64, "uint64"},
41   {ACL_FLOAT16, "float16"}, {ACL_FLOAT, "float32"}, {ACL_DOUBLE, "float64"}, {ACL_BOOL, "bool"}};
42 
__anonb1681ed20202() 43 const std::map<std::string, aclDataType> kStringTypeToAclType = []() -> std::map<std::string, aclDataType> {
44   std::map<std::string, aclDataType> ret;
45   for (const auto &[acl_type, type_str] : kAclTypeToString) {
46     ret.emplace(type_str, acl_type);
47   }
48   return ret;
49 }();
50 
51 std::vector<std::pair<void **, std::thread *>> g_acl_handle_map = {};
52 
53 std::mutex g_acl_destroy_all_mutex = {};
54 bool g_acl_destroy_all = false;
55 
GetAclDataType(const std::string & str_type,aclDataType * acl_type)56 bool GetAclDataType(const std::string &str_type, aclDataType *acl_type) {
57   MS_EXCEPTION_IF_NULL(acl_type);
58   auto iter = kStringTypeToAclType.find(str_type);
59   if (iter == kStringTypeToAclType.end()) {
60     MS_LOG(EXCEPTION) << "Invalid type " << str_type;
61   }
62   *acl_type = iter->second;
63   return true;
64 }
65 
CheckRtRetWithError(aclError error,const std::string & msg)66 void CheckRtRetWithError(aclError error, const std::string &msg) {
67   if (error != ACL_ERROR_NONE) {
68     MS_LOG(ERROR) << "Rt error: " << msg << " | Error number: " << error;
69   }
70 }
71 
IsGetNextOp(const std::string & op_name)72 bool IsGetNextOp(const std::string &op_name) { return op_name == kGetNextOpName || op_name == kDynamicGetNextV2OpName; }
73 }  // namespace
74 
75 namespace tdt_handle {
AddHandle(acltdtChannelHandle ** handle,std::thread * use_thread)76 void AddHandle(acltdtChannelHandle **handle, std::thread *use_thread) {
77   void **void_handle = reinterpret_cast<void **>(handle);
78   if (*handle == nullptr) {
79     return;
80   }
81 
82   for (auto iter = g_acl_handle_map.cbegin(); iter != g_acl_handle_map.cend(); ++iter) {
83     if (iter->first == void_handle) {
84       return;
85     }
86   }
87 
88   g_acl_handle_map.emplace_back(void_handle, use_thread);
89   {
90     std::lock_guard<std::mutex> lock(g_acl_destroy_all_mutex);
91     g_acl_destroy_all = false;
92   }
93 }
94 
DelHandle(acltdtChannelHandle ** handle)95 void DelHandle(acltdtChannelHandle **handle) {
96   void **void_handle = reinterpret_cast<void **>(handle);
97   for (auto iter = g_acl_handle_map.begin(); iter != g_acl_handle_map.end();) {
98     if (iter->first == void_handle) {
99       iter = g_acl_handle_map.erase(iter);
100     } else {
101       ++iter;
102     }
103   }
104 }
105 
DestroyHandle()106 bool DestroyHandle() {
107   std::lock_guard<std::mutex> lock(g_acl_destroy_all_mutex);
108   bool destroy_all = true;
109   for (auto &item : g_acl_handle_map) {
110     acltdtChannelHandle **handle = reinterpret_cast<acltdtChannelHandle **>(item.first);
111     if (*handle != nullptr) {
112       aclError stop_status = CALL_ASCEND_API(acltdtStopChannel, *handle);
113       if (stop_status != ACL_SUCCESS) {
114         MS_LOG(ERROR) << "Failed stop acl data channel and the stop status is " << stop_status << std::endl;
115         return false;
116       }
117       if (item.second != nullptr && item.second->joinable()) {
118         item.second->join();
119       }
120       if (CALL_ASCEND_API(acltdtDestroyChannel, *handle) != ACL_SUCCESS) {
121         MS_LOG(INFO) << "acltdtDestroyChannel failed.";
122         destroy_all = false;
123       } else {
124         *handle = nullptr;
125       }
126     }
127   }
128 
129   // clear the map container when all the handle has been destroyed
130   if (destroy_all) {
131     g_acl_handle_map.clear();
132     g_acl_destroy_all = true;
133   }
134   return destroy_all;
135 }
136 
IsClosed()137 bool IsClosed() {
138   std::lock_guard<std::mutex> lock(g_acl_destroy_all_mutex);
139   return g_acl_destroy_all;
140 }
141 }  // namespace tdt_handle
142 
AscendDataQueueDynamic(const std::string & channel_name,const size_t capacity)143 AscendDataQueueDynamic::AscendDataQueueDynamic(const std::string &channel_name, const size_t capacity)
144     : DataQueue(channel_name, capacity), stream_(nullptr), node_info_(nullptr) {
145   auto context_key = device_context_->device_context_key();
146   auto runtime_instance =
147     device::KernelRuntimeManager::Instance().GetKernelRuntime(context_key.device_name_, context_key.device_id_);
148   node_info_ = std::make_unique<NodeInfo[]>(capacity);
149   stream_ = runtime_instance->compute_stream();
150 }
151 
Push(std::vector<DataQueueItem> data)152 DataQueueStatus AscendDataQueueDynamic::Push(std::vector<DataQueueItem> data) {
153   for (size_t i = 0; i < data.size(); i++) {
154     auto &item = data[i];
155     if (item.data_ptr == nullptr) {
156       MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr << ", len: " << item.data_len;
157       return DataQueueStatus::ERROR_INPUT;
158     }
159     void *addr = device_context_->device_res_manager_->AllocateMemory(item.data_len);
160     if (addr == nullptr) {
161       MS_LOG(ERROR) << "Allocate device memory of data queue failed";
162     }
163     CheckRtRetWithError(CALL_ASCEND_API(aclrtMemcpyAsync, addr, item.data_len, item.data_ptr, item.data_len,
164                                         ACL_MEMCPY_HOST_TO_DEVICE, stream_),
165                         "Rt Memcpy Error");
166     item.device_addr = addr;
167   }
168   CheckRtRetWithError(CALL_ASCEND_API(aclrtSynchronizeStreamWithTimeout, stream_, -1),
169                       "Call runtime aclrtSynchronizeStreamWithTimeout failed");
170   node_info_[tail_].data_ = std::move(data);
171   tail_ = (tail_ + 1) % (capacity_);
172   ++size_;
173   return DataQueueStatus::SUCCESS;
174 }
175 
Front(std::vector<DataQueueItem> * data) const176 DataQueueStatus AscendDataQueueDynamic::Front(std::vector<DataQueueItem> *data) const {
177   for (auto &item : node_info_[head_].data_) {
178     host_release_(item.data_ptr, item.worker_id);
179   }
180   *data = node_info_[head_].data_;
181   return DataQueueStatus::SUCCESS;
182 }
183 
Pop()184 DataQueueStatus AscendDataQueueDynamic::Pop() {
185   head_ = (head_ + 1) % (capacity_);
186   --size_;
187   return DataQueueStatus::SUCCESS;
188 }
189 
AscendTdtQueue(const std::string & channel_name)190 AscendTdtQueue::AscendTdtQueue(const std::string &channel_name) : DataQueue(channel_name, 0), acl_handle_(nullptr) {
191   // Init ErrorManager
192   if (!ascend::ErrorManagerAdapter::Init()) {
193     MS_LOG(WARNING) << "[Internal Error] Init ErrorManager failed.";
194   }
195   // get device id
196   MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
197   device_id_ = MsContext::GetInstance()->get_param<uint32_t>(MS_CTX_DEVICE_ID);
198 
199   aclError ret = CALL_ASCEND_API(aclrtSetDevice, device_id_);
200   if (ret != ACL_ERROR_NONE) {
201     MS_LOG(ERROR) << "Acl open device " << device_id_ << " failed.";
202   }
203 
204 #if defined(ENABLE_PYTHON) && !defined(ENABLE_ANDROID)
205   // There is a python flag in MindSpore to recognize if the runtime env is python.
206   // If we only use MD feature, python_env_flag will not set to true,
207   // then init.cc will not call ClearResAtexit at the end of MindSpore to clean resource.
208   // The original case is [only MD + mbuf + device_queue + send], the ascend stream release
209   // failed if we don't call ClearResAtexit first.
210   mindspore::python_adapter::set_python_env_flag(true);
211 #endif
212 
213   // create acl tdt handle
214   if (!channel_name_.empty()) {
215     // When "capacity" is too large, device memory will be exploded
216     size_t data_queue_capacity = 128;
217     std::string env_capacity_str = common::GetEnv("MS_DATASET_SINK_QUEUE");
218     if (!env_capacity_str.empty()) {
219       int32_t env_capacity = atoi(env_capacity_str.c_str());
220       if (env_capacity <= 0) {
221         MS_LOG(EXCEPTION) << "Invalid data queue capacity.#umsg#User Help Message:#umsg#"
222                              "Expect env variable MS_DATASET_SINK_QUEUE > 0.";
223       }
224       data_queue_capacity = env_capacity;
225     }
226     // Create device channel
227     acl_handle_ =
228       CALL_ASCEND_API(acltdtCreateChannelWithCapacity, device_id_, channel_name_.c_str(), data_queue_capacity);
229     if (acl_handle_ != nullptr) {
230       MS_LOG(INFO) << "Select MBUF channel, the capacity of data queue is: " << data_queue_capacity;
231       queue_type_ = "Ascend_MBUF";
232     } else {
233       MS_LOG(INFO) << "Select TDT channel.";
234       acl_handle_ = CALL_ASCEND_API(acltdtCreateChannel, device_id_, channel_name_.c_str());
235       queue_type_ = "Ascend_TDT";
236       if (acl_handle_ == nullptr) {
237         MS_LOG(EXCEPTION) << "Create channel for sending data failed.#umsg#User Help Message:#umsg#"
238                              "Please check DEVICE ID setting, DEVICE ID that passed into dataset"
239                              "(from context) and training process should be the same.";
240       }
241     }
242     tdt_handle::AddHandle(&acl_handle_, nullptr);
243   }
244 
245   // a wingman of tdt to help with transferring data shapes on host
246   auto wingman_queue = std::make_shared<BlockingQueue>();
247   std::shared_ptr<DataQueue> data_queue = std::make_shared<WingmanQueue>(channel_name);
248   auto rt = wingman_queue->Create(data_queue);
249   if (rt != DataQueueStatus::SUCCESS) {
250     MS_LOG(EXCEPTION) << "Wingman queue: " << channel_name << "create failed: " << rt;
251   }
252   DataQueueMgr::GetInstance().Manage(channel_name, wingman_queue);
253 }
254 
~AscendTdtQueue()255 AscendTdtQueue::~AscendTdtQueue() {
256   if (acl_handle_ != nullptr) {
257     if (CALL_ASCEND_API(acltdtDestroyChannel, acl_handle_) != ACL_SUCCESS) {
258       MS_LOG(ERROR) << "Failed to destroy channel for tdt queue. The details refer to 'Ascend Error Message'.";
259     } else {
260       tdt_handle::DelHandle(&acl_handle_);
261       acl_handle_ = nullptr;
262     }
263   }
264   if (DataQueueMgr::GetInstance().IsCreated(channel_name_)) {
265     DataQueueMgr::GetInstance().Free(channel_name_);
266   }
267   aclError rt_ret = CALL_ASCEND_API(aclrtResetDevice, device_id_);
268   if (rt_ret != ACL_ERROR_NONE) {
269     MS_LOG(ERROR) << "Reset device " << device_id_ << " failed.";
270   }
271 }
272 
QueryQueueSize() const273 size_t AscendTdtQueue::QueryQueueSize() const {
274   size_t size = 0;
275   if (!IsOpen()) {
276     MS_LOG(INFO) << "Mbuf channel has been closed, should not query size.";
277     return 0;
278   }
279   auto status = CALL_ASCEND_API(acltdtQueryChannelSize, acl_handle_, &size);
280   if (status != ACL_SUCCESS) {
281     MS_LOG(EXCEPTION) << "Unable to query real-time size of Mbuf channel: " << channel_name_
282                       << ", error code: " << status;
283   }
284   return size;
285 }
286 
IsOpen() const287 bool AscendTdtQueue::IsOpen() const { return !tdt_handle::IsClosed(); }
288 
Push(std::vector<DataQueueItem> data)289 DataQueueStatus AscendTdtQueue::Push(std::vector<DataQueueItem> data) {
290   MS_LOG(DEBUG) << "TDT channel name is " << channel_name_ << ".";
291   acltdtDataset *acl_dataset = nullptr;
292   auto ret = Translate(data, &acl_dataset);
293   if (!ret) {
294     DestroyAclDataset(acl_dataset);
295     MS_LOG(ERROR) << "Converting into TDT tensor failed!";
296     return DataQueueStatus::INTERNAL_ERROR;
297   }
298 
299   // Data prefetch only when PS mode enables cache.
300   if (CALL_ASCEND_API(acltdtGetDatasetSize, acl_dataset) > 0) {
301     acltdtDataItem *item0 = CALL_ASCEND_API(acltdtGetDataItem, acl_dataset, 0);
302     std::string item_type;
303     ParseType(CALL_ASCEND_API(acltdtGetDataTypeFromItem, item0), &item_type);
304   }
305   auto status = CALL_ASCEND_API(acltdtSendTensor, acl_handle_, acl_dataset, -1);
306   DestroyAclDataset(acl_dataset);
307   if (status != ACL_SUCCESS) {
308     // if the device_queue thread had been interrupted by master, just print warning and return success
309     if (tdt_handle::IsClosed()) {
310       MS_LOG(WARNING) << "Device queue thread had been interrupted by TdtHandle::DestroyHandle, you can ignore "
311                       << "the above error: 'failed to send...'. In this scenario, the training ends first without "
312                       << "using all epoch(s) data, and the data preprocessing is blocked by the data "
313                       << "transmission channel on the device side. So we force the data transmission channel to stop.";
314       return DataQueueStatus::SUCCESS;
315     }
316     MS_LOG(EXCEPTION) << "Tdt Send data failed. The details refer to 'Ascend Error Message'.";
317   }
318   auto wingman = DataQueueMgr::GetInstance().GetDataQueue(channel_name_);
319   if (wingman != nullptr && wingman->IsOpen() && !data.empty()) {
320     (void)wingman->Push(data);
321   }
322   return DataQueueStatus::SUCCESS;
323 }
324 
ParseType(aclDataType acl_data_type,std::string * data_type) const325 void AscendTdtQueue::ParseType(aclDataType acl_data_type, std::string *data_type) const {
326   auto type_iter = kAclTypeToString.find(acl_data_type);
327   if (type_iter == kAclTypeToString.end()) {
328     MS_LOG(EXCEPTION) << "Got unsupported acl datatype: " << acl_data_type;
329   }
330   *data_type = type_iter->second;
331 }
332 
Translate(const std::vector<DataQueueItem> & data,acltdtDataset ** output_acl_dataset) const333 bool AscendTdtQueue::Translate(const std::vector<DataQueueItem> &data, acltdtDataset **output_acl_dataset) const {
334   auto acl_dataset = CALL_ASCEND_API(acltdtCreateDataset);
335   if (acl_dataset == nullptr) {
336     MS_LOG(ERROR) << "Create tdt dataset failed.";
337     return false;
338   }
339   bool status = AssembleTensor2AclDataset(data, acl_dataset);
340   if (!status) {
341     DestroyAclDataset(acl_dataset);
342     MS_LOG(ERROR) << "Assemble tensor row to tdt dataset failed.";
343     return false;
344   }
345 
346   *output_acl_dataset = acl_dataset;
347   return true;
348 }
349 
AssembleTensor2AclDataset(const std::vector<DataQueueItem> & data,acltdtDataset * acl_dataset) const350 bool AscendTdtQueue::AssembleTensor2AclDataset(const std::vector<DataQueueItem> &data,
351                                                acltdtDataset *acl_dataset) const {
352   if (data.empty()) {
353     acltdtDataItem *acl_data = CALL_ASCEND_API(acltdtCreateDataItem, acltdtTensorType::ACL_TENSOR_DATA_END_OF_SEQUENCE,
354                                                nullptr, 0, ACL_BOOL, nullptr, 0);
355     if (acl_data == nullptr) {
356       MS_LOG(ERROR) << "Create data item failed when send empty data.";
357       return false;
358     }
359     if (CALL_ASCEND_API(acltdtAddDataItem, acl_dataset, acl_data) != ACL_SUCCESS) {
360       if (CALL_ASCEND_API(acltdtDestroyDataItem, acl_data) != ACL_SUCCESS) {
361         MS_LOG(ERROR) << "Destroy data item failed when send empty data.";
362       }
363       MS_LOG(ERROR) << "Add data item to tdt dataset failed when send data.";
364       return false;
365     }
366     return true;
367   }
368 
369   for (const auto &ts : data) {
370     aclDataType acl_type;
371     acltdtDataItem *acl_data = nullptr;
372     if (!GetAclDataType(ts.data_type, &acl_type)) {
373       MS_LOG(ERROR) << "Convert type " << ts.data_type << " to acl type failed.";
374       return false;
375     }
376 
377     const auto &shape = ts.shapes;
378     std::string shape_str = "[";
379     for (auto dim : shape) {
380       (void)shape_str.append(std::to_string(dim)).append(",");
381     }
382     shape_str.pop_back();
383     (void)shape_str.append("]");
384 
385     void *data_ptr = ts.data_ptr;
386     size_t data_size = ts.data_len;
387 
388     acl_data = CALL_ASCEND_API(acltdtCreateDataItem, acltdtTensorType::ACL_TENSOR_DATA_TENSOR,
389                                (shape.empty() ? nullptr : &shape[0]), shape.size(), acl_type, data_ptr, data_size);
390     if (acl_data == nullptr) {
391       MS_LOG(ERROR) << "Create data item failed when send data.";
392       return false;
393     }
394     if (CALL_ASCEND_API(acltdtAddDataItem, acl_dataset, acl_data) != ACL_SUCCESS) {
395       if (CALL_ASCEND_API(acltdtDestroyDataItem, acl_data) != ACL_SUCCESS) {
396         MS_LOG(ERROR) << "Destroy data item failed when send data with type ACL_TENSOR_DATA_TENSOR.";
397       }
398       MS_LOG(INFO) << "Add data item to tdt dataset failed when send data.";
399       return false;
400     }
401 
402     MS_LOG(DEBUG) << "TDT data type is TDT_TENSOR, tensor type is " << acl_type << ", tensor shape is " << shape_str
403                   << ", data length is " << data_size << ".";
404   }
405 
406   return true;
407 }
408 
DestroyAclDataset(acltdtDataset * acl_dataset,bool include_data_item) const409 void AscendTdtQueue::DestroyAclDataset(acltdtDataset *acl_dataset, bool include_data_item) const {
410   if (include_data_item) {
411     for (size_t i = 0; i < CALL_ASCEND_API(acltdtGetDatasetSize, acl_dataset); i++) {
412       auto data_item = CALL_ASCEND_API(acltdtGetDataItem, acl_dataset, i);
413       if (CALL_ASCEND_API(acltdtDestroyDataItem, data_item) != ACL_SUCCESS) {
414         MS_LOG(EXCEPTION) << "Destroy data item failed when send data.";
415       }
416     }
417   }
418 
419   if (CALL_ASCEND_API(acltdtDestroyDataset, acl_dataset) != ACL_SUCCESS) {
420     MS_LOG(EXCEPTION) << "Destroy tdt dataset failed when send data.";
421   }
422 }
423 
Push(const std::vector<DataQueueItem> data)424 DataQueueStatus WingmanQueue::Push(const std::vector<DataQueueItem> data) {
425   queue_.emplace(data);
426   return DataQueueStatus::SUCCESS;
427 }
428 
Pop()429 DataQueueStatus WingmanQueue::Pop() {
430   queue_.pop();
431   return DataQueueStatus::SUCCESS;
432 }
433 
Front(std::vector<DataQueueItem> * data) const434 DataQueueStatus WingmanQueue::Front(std::vector<DataQueueItem> *data) const {
435   *data = queue_.front();
436   return DataQueueStatus::SUCCESS;
437 }
438 
FrontAsync(std::vector<DataQueueItem> * data) const439 DataQueueStatus WingmanQueue::FrontAsync(std::vector<DataQueueItem> *data) const { return this->Front(data); }
440 
Close()441 void WingmanQueue::Close() {
442   queue_ = {};
443   closed_ = true;
444 }
445 
GetTdtWingManQueue(const PrimitivePtr & prim)446 std::shared_ptr<BlockingQueue> GetTdtWingManQueue(const PrimitivePtr &prim) {
447   if (!IsGetNextOp(prim->name())) return nullptr;
448   auto queue_name = GetValue<std::string>(prim->GetAttr("shared_name"));
449   if (!DataQueueMgr::GetInstance().IsCreated(queue_name)) {
450     return nullptr;
451   }
452   return DataQueueMgr::GetInstance().GetDataQueue(queue_name);
453 }
454 
GetTdtWingManQueue(const std::shared_ptr<AnfNode> & node)455 std::shared_ptr<BlockingQueue> GetTdtWingManQueue(const std::shared_ptr<AnfNode> &node) {
456   if (!common::AnfAlgo::IsGetNextNode(node)) return nullptr;
457   return GetTdtWingManQueue(common::AnfAlgo::GetCNodePrimitive(node));
458 }
459 
CloseTdtWingManQueue(const PrimitivePtr & prim)460 void CloseTdtWingManQueue(const PrimitivePtr &prim) {
461   if (!IsGetNextOp(prim->name())) return;
462   auto wingman = GetTdtWingManQueue(prim);
463   if (wingman && wingman->IsOpen()) {
464     wingman->Close();
465   }
466 }
467 
CloseTdtWingManQueue(const std::shared_ptr<AnfNode> & node)468 void CloseTdtWingManQueue(const std::shared_ptr<AnfNode> &node) {
469   if (!common::AnfAlgo::IsGetNextNode(node)) return;
470   return CloseTdtWingManQueue(common::AnfAlgo::GetCNodePrimitive(node));
471 }
472 
473 namespace {
CreateAscendDataQueue(const std::string & channel_name,bool dynamic_shape,size_t capacity,const std::vector<size_t> &)474 std::shared_ptr<DataQueue> CreateAscendDataQueue(const std::string &channel_name, bool dynamic_shape, size_t capacity,
475                                                  const std::vector<size_t> &) {
476   return std::make_shared<AscendTdtQueue>(channel_name);
477 }
478 
479 REGISTER_DATA_QUEUE_CREATOR(kAscendDevice, CreateAscendDataQueue);
480 struct DevicePlugFuncRegister {
DevicePlugFuncRegistermindspore::device::__anonb1681ed20311::DevicePlugFuncRegister481   DevicePlugFuncRegister() noexcept {
482     DataQueueMgr::SetDestoryTdtHandleHandler([]() -> bool { return tdt_handle::DestroyHandle(); });
483   }
484 } ascend_device_func_register;
485 }  // namespace
486 }  // namespace device
487 }  // namespace mindspore
488