• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "minddata/dataset/engine/datasetops/data_queue_op.h"
18 
19 #include <algorithm>
20 #include <iostream>
21 #include <memory>
22 #include <unordered_map>
23 
24 #include "minddata/dataset/engine/gpu_item_connector.h"
25 #include "minddata/dataset/engine/dataset_iterator.h"
26 #include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h"
27 #include "minddata/dataset/util/status.h"
28 #include "minddata/dataset/util/task_manager.h"
29 #ifdef WITH_BACKEND
30 #include "mindspore/ccsrc/include/backend/data_queue/data_queue_mgr.h"
31 #include "include/backend/distributed/embedding_cache/embedding_cache_utils.h"
32 #include "include/backend/distributed/embedding_cache/data_queue_manager.h"
33 #include "utils/ms_context.h"
34 #endif
35 namespace mindspore {
36 namespace dataset {
37 #ifdef WITH_BACKEND
38 using distributed::DataQueueManager;
39 using distributed::IdDataInfo;
40 using distributed::IndexDataInfo;
41 #endif
42 
43 namespace {
ConvertTensorRowToDataQueueItem(const TensorRow & row)44 std::vector<DataQueueItem> ConvertTensorRowToDataQueueItem(const TensorRow &row) {
45   std::vector<device::DataQueueItem> items;
46   for (auto &i : row) {
47     device::DataQueueItem data_item;
48     data_item.data_len = static_cast<size_t>(i->SizeInBytes());
49     data_item.shapes = i->shape().AsVector();
50     data_item.data_ptr = const_cast<void *>(static_cast<const void *>(i->GetBuffer()));
51     data_item.data_type = i->type().ToString();
52     items.emplace_back(std::move(data_item));
53   }
54   return items;
55 }
56 
57 #ifdef WITH_BACKEND
DeepCopyConvertTensorRowToDataQueueItem(const TensorRow & row)58 std::vector<DataQueueItem> DeepCopyConvertTensorRowToDataQueueItem(const TensorRow &row) {
59   std::vector<device::DataQueueItem> items;
60   for (auto &i : row) {
61     device::DataQueueItem data_item;
62     data_item.data_len = static_cast<size_t>(i->SizeInBytes());
63     data_item.shapes = i->shape().AsVector();
64     data_item.data_ptr = malloc(data_item.data_len);
65     MS_EXCEPTION_IF_NULL(data_item.data_ptr);
66     auto ret = memcpy_s(data_item.data_ptr, data_item.data_len, i->GetBuffer(), data_item.data_len);
67     if (ret != EOK) {
68       MS_LOG(EXCEPTION) << "Memcpy for data queue item failed, errno[" << ret << "]";
69     }
70     data_item.data_type = i->type().ToString();
71 
72     items.emplace_back(std::move(data_item));
73   }
74   return items;
75 }
76 #endif
77 
PushEpochEndToQueue(const std::string & channel_name)78 void PushEpochEndToQueue(const std::string &channel_name) {
79 #ifdef WITH_BACKEND
80   const auto &id_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name).first;
81   MS_EXCEPTION_IF_NULL(id_data_queue);
82   IdDataInfo *data = new IdDataInfo();
83   MS_EXCEPTION_IF_NULL(data);
84   data->end_of_epoch_ = true;
85   id_data_queue->Push(data);
86 #endif
87 }
88 
PushFileEndToQueue(const std::string & channel_name)89 void PushFileEndToQueue(const std::string &channel_name) {
90 #ifdef WITH_BACKEND
91   const auto &id_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name).first;
92   MS_EXCEPTION_IF_NULL(id_data_queue);
93   IdDataInfo *data = new IdDataInfo();
94   MS_EXCEPTION_IF_NULL(data);
95   data->end_of_file_ = true;
96   id_data_queue->Push(data);
97 #endif
98 }
99 }  // namespace
DataQueueOp(const std::string channel_name,DeviceType device_type,int32_t device_id,bool send_epoch_end,int32_t total_batch,bool create_data_info_queue)100 DataQueueOp::DataQueueOp(const std::string channel_name, DeviceType device_type, int32_t device_id, bool send_epoch_end,
101                          int32_t total_batch, bool create_data_info_queue)
102     : PipelineOp(1),
103       ascend_keep_waiting_(true),
104       num_workers_(kDeviceQueGpuNumThreads),
105       queue_capacity_(kDeviceQueGpuQueueCapacity),
106       channel_name_(channel_name),
107       device_type_(device_type),
108       device_id_(device_id),
109       send_epoch_end_(send_epoch_end),
110       stop_send_(false),
111       send_finished_(false),
112       total_batch_(total_batch),
113       create_data_info_queue_(create_data_info_queue),
114       data_info_queue_ptr_(nullptr),
115       first_fetch_flag_(false),
116       first_push_flag_(false),
117       send_summary_({}) {
118   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
119   dynamic_shape_ = cfg->dynamic_shape();
120 
121   // Be careful when try to modified these num_workers_ and queue_capacity_,
122   // and we suggest num_workers_ * queue_capacity_ not greater than 16, because
123   // one worker one circular_pool with 1G pin memory, so num_workers_ * queue_capacity_
124   // must limit to avoid memory overload
125 #ifdef WITH_BACKEND
126   MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
127   if (MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kAscendDevice) {
128     ascend_data_queue_ =
129       device::DataQueueMgr::GetInstance().CreateDataQueue(kAscendDevice, channel_name, dynamic_shape_, 0, {});
130   }
131   enable_prefetch_cache_pipeline_ = distributed::EmbeddingCacheTableManager::GetInstance().enable_pipeline();
132 #endif
133 #ifdef ENABLE_DUMP_IR
134   md_channel_info_ = std::make_shared<MDChannelInfo>(channel_name_);
135 #endif
136 }
137 
~DataQueueOp()138 DataQueueOp::~DataQueueOp() {
139 #ifdef ENABLE_DUMP_IR
140   // BFS iter execution tree to get send epoch from EpochControl Op
141   std::vector<std::shared_ptr<DatasetOp>> child_node = this->Children();
142   size_t node_index = 0;
143   int32_t num_epochs = 0;
144   while (child_node.size() != 0 && node_index < child_node.size()) {
145     auto node = child_node[node_index];
146     if (node->Name() == kEpochCtrlOp) {
147       EpochCtrlOp *op = dynamic_cast<EpochCtrlOp *>(node.get());
148       if (op != nullptr) {
149         num_epochs = op->NumEpochs();
150         break;
151       }
152     }
153     auto child_child_node = node->Children();
154     if (!child_child_node.empty()) {
155       (void)std::copy(child_child_node.begin(), child_child_node.end(), std::back_inserter(child_node));
156     }
157     ++node_index;
158   }
159 
160   // won't print rdr if call stop_send manually or send infinite epoch
161   std::string rdr_msg = md_channel_info_->ToFormatString();
162   if (!send_finished_ && !rdr_msg.empty() && num_epochs != -1) {
163     MS_LOG(WARNING) << rdr_msg;
164   }
165   MS_LOG(INFO) << "Release the DataQueueOp, channel name: " << channel_name_;
166 #endif
167 }
168 
ReleaseData(void * addr,int32_t worker_id)169 void DataQueueOp::ReleaseData(void *addr, int32_t worker_id) {
170   if (addr != nullptr && worker_id >= 0 && worker_id < pool_.size()) {
171     pool_[worker_id]->Deallocate(addr);
172   }
173 }
174 
EoeReceived(int32_t)175 Status DataQueueOp::EoeReceived(int32_t) {
176   state_ = OpState::kDeOpIdle;
177   return Status::OK();
178 }
179 
FilterMetadata(TensorRow * row) const180 Status DataQueueOp::FilterMetadata(TensorRow *row) const {
181   std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
182   TensorRow output;
183   TensorRow tmp = *row;
184   std::vector<size_t> to_keep_indices;
185   for (auto column : current_name_id_map) {
186     std::string column_name = column.first;
187     // Need to filter meta column start with kDftMetaColumnPrefix
188     size_t pos = column_name.find(kDftMetaColumnPrefix);
189     if (pos != std::string::npos && pos == 0) {
190       continue;
191     }
192     to_keep_indices.push_back(column.second);
193   }
194   if (to_keep_indices.size() == 0) {
195     std::string err_msg = "No effective column found, maybe all columns are meta column and will be filtered. ";
196     err_msg += "If you want to output meta column please rename column name to a new one which is not start with ";
197     err_msg += "\"" + std::string(kDftMetaColumnPrefix) + "\"";
198     RETURN_STATUS_UNEXPECTED(err_msg);
199   }
200   std::sort(to_keep_indices.begin(), to_keep_indices.end());
201   (void)std::transform(to_keep_indices.begin(), to_keep_indices.end(), std::back_inserter(output),
202                        [&tmp](const auto &it) { return std::move(tmp[it]); });
203   *row = std::move(output);
204   return Status::OK();
205 }
206 
CheckExceptions(const TensorRow & row) const207 Status DataQueueOp::CheckExceptions(const TensorRow &row) const {
208   // this method checks if the row meets the conditions to be sent to TDT
209   for (const auto &item : row) {
210     CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(),
211                                  "Invalid datatype, cannot send string, or Python dict to device.");
212     CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, the data send to device is null.");
213   }
214   return Status::OK();
215 }
216 
operator ()()217 Status DataQueueOp::operator()() {
218 #ifndef ENABLE_SECURITY
219   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Detect first batch",
220                                                       std::bind(&DataQueueOp::DetectFirstBatch, this), nullptr, id()));
221 #endif
222   TaskManager::FindMe()->Post();
223   child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
224 
225 #ifdef ENABLE_DUMP_IR
226   if (md_channel_info_ == nullptr) {
227     RETURN_STATUS_UNEXPECTED("[Internal ERROR] RDR module init failed.");
228   }
229 #endif
230   if (device_type_ == DeviceType::Ascend) {
231 #ifdef WITH_BACKEND
232     if (create_data_info_queue_) {
233       // This place has a race condition with GetDataInfo, so the first one
234       // arrive here will do the initialize work.
235       {
236         std::unique_lock<std::mutex> lock(data_info_mutex_);
237         if (data_info_queue_ptr_ == nullptr) {
238           data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
239           RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
240         }
241       }
242     }
243     RETURN_IF_NOT_OK(SendDataToAscend());
244 
245 #endif
246   } else if (device_type_ == DeviceType::GPU) {
247 #ifdef WITH_BACKEND
248     if (create_data_info_queue_) {
249       // This place has a race condition with GetDataInfo, so the first one
250       // arrive here will do the initialize work.
251       {
252         std::unique_lock<std::mutex> lock(data_info_mutex_);
253         if (data_info_queue_ptr_ == nullptr) {
254           data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
255           RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
256         }
257       }
258     }
259     RETURN_IF_NOT_OK(SendDataToGPU());
260 #endif
261   } else if (device_type_ == DeviceType::CPU) {
262     RETURN_IF_NOT_OK(SendDataToCPU());
263   }
264 
265   return Status::OK();
266 }
267 
PushPrefetchDataToGPU()268 Status DataQueueOp::PushPrefetchDataToGPU() {
269 #ifdef WITH_BACKEND
270   TaskManager::FindMe()->Post();
271   const auto &index_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name_).second;
272   MS_EXCEPTION_IF_NULL(index_data_queue);
273   bool eof_flag = false;
274   int64_t send_batch = 0;
275 
276   bool is_profiling_enable = GlobalContext::profiling_manager()->IsProfilingEnable(tree_);
277   uint64_t push_cost;
278 
279   while (!eof_flag && !DataQueueManager::GetInstance().IsClosed() && !device::DataQueueMgr::GetInstance().IsClosed() &&
280          !TaskManager::FindMe()->Interrupted()) {
281     IndexDataInfo *data = index_data_queue->Pop();
282     if (DataQueueManager::GetInstance().IsClosed() || device::DataQueueMgr::GetInstance().IsClosed()) {
283       // Terminate abnormally.
284       break;
285     }
286     MS_EXCEPTION_IF_NULL(data);
287 
288     eof_flag = data->end_of_file_;
289     if (data->data_ != nullptr) {
290       MS_EXCEPTION_IF_NULL(data->items_);
291 
292       auto status = RetryPushData(*(data->items_), is_profiling_enable, &push_cost);
293       delete data->items_;
294       delete data;
295 
296       RETURN_IF_NOT_OK(status);
297 
298       ++send_batch;
299       if (total_batch_ > 0 && send_batch >= total_batch_) {
300         break;
301       }
302     } else {
303       delete data;
304     }
305   }
306 #endif
307   return Status::OK();
308 }
309 
PushDataToGPUCacheQueue(std::vector<device::DataQueueItem> && data_items)310 Status DataQueueOp::PushDataToGPUCacheQueue(std::vector<device::DataQueueItem> &&data_items) {
311 #ifdef WITH_BACKEND
312   std::vector<device::DataQueueItem> *items = new std::vector<device::DataQueueItem>(std::move(data_items));
313   MS_EXCEPTION_IF_NULL(items);
314   if (items->empty()) {
315     MS_LOG(EXCEPTION) << "The data queue item is empty.";
316   }
317   const auto &id_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name_).first;
318   MS_EXCEPTION_IF_NULL(id_data_queue);
319   IdDataInfo *data = new IdDataInfo(items->at(0).data_ptr, items->at(0).data_len, items, false, false);
320 
321   MS_EXCEPTION_IF_NULL(data);
322   id_data_queue->Push(data);
323 #endif
324 
325   return Status::OK();
326 }
327 
PushPrefetchDataToAscend()328 Status DataQueueOp::PushPrefetchDataToAscend() {
329 #ifdef WITH_BACKEND
330   TaskManager::FindMe()->Post();
331   const auto &index_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name_).second;
332   MS_EXCEPTION_IF_NULL(index_data_queue);
333   MS_EXCEPTION_IF_NULL(ascend_data_queue_);
334   bool eof_flag = false;
335   int64_t send_batch = 0;
336   while (!eof_flag && !DataQueueManager::GetInstance().IsClosed() && ascend_data_queue_->IsOpen()) {
337     IndexDataInfo *data = index_data_queue->Pop();
338     if (DataQueueManager::GetInstance().IsClosed() || !ascend_data_queue_->IsOpen()) {
339       // Terminate abnormally.
340       break;
341     }
342     MS_EXCEPTION_IF_NULL(data);
343     device::DataQueueStatus status;
344 
345     bool eoe_flag = data->end_of_epoch_;
346     eof_flag = data->end_of_file_;
347     if (data->data_ != nullptr) {
348       MS_EXCEPTION_IF_NULL(data->items_);
349 
350       if (ascend_data_queue_->QueueType() == "Ascend_MBUF") {
351         size_t batch_data_len =
352           std::accumulate(data->items_->begin(), data->items_->end(), static_cast<size_t>(0),
353                           [](size_t acc, const DataQueueItem &item) { return acc + item.data_len; });
354         RETURN_IF_NOT_OK(WaitForAscendQueue(batch_data_len));
355       }
356 
357       status = ascend_data_queue_->Push(*(data->items_));
358       for (auto &item : *(data->items_)) {
359         MS_EXCEPTION_IF_NULL(item.data_ptr);
360         free(item.data_ptr);
361       }
362       delete data->items_;
363       delete data;
364       if (status != device::DataQueueStatus::SUCCESS) {
365         if (stop_send_) {
366           MS_LOG(INFO) << "stop_send received";
367           return Status::OK();
368         }
369         RETURN_STATUS_ERROR(
370           StatusCode::kMDTDTPushFailure,
371           "TDT Push data into device Failed, check the first error or TraceBack first, more checking advises are: "
372           "1) if training is not ready, error might raised by network computing operator or environment configuration. "
373           "2) other cases, checking info level log or search this error in mindspore's FAQ for detail solution.");
374       }
375       ++send_batch;
376       if (total_batch_ > 0 && send_batch >= total_batch_) {
377         break;
378       }
379     } else {
380       delete data;
381       if (send_epoch_end_ && eoe_flag) {
382         // empty data
383         status = ascend_data_queue_->Push({});
384         bool break_loop = false;
385         RETURN_IF_NOT_OK(CheckPushStatus(status, stop_send_, &send_finished_, &break_loop));
386         if (break_loop) {
387           break;
388         }
389       }
390     }
391   }
392 #endif
393 
394   return Status::OK();
395 }
396 
PushDataToAscendCacheQueue(const TensorRow & curr_row)397 Status DataQueueOp::PushDataToAscendCacheQueue(const TensorRow &curr_row) {
398 #ifdef WITH_BACKEND
399   std::vector<device::DataQueueItem> *items =
400     new std::vector<device::DataQueueItem>(DeepCopyConvertTensorRowToDataQueueItem(curr_row));
401   MS_EXCEPTION_IF_NULL(items);
402   if (items->empty()) {
403     MS_LOG(EXCEPTION) << "The data queue item is empty.";
404   }
405   const auto &id_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name_).first;
406   MS_EXCEPTION_IF_NULL(id_data_queue);
407   IdDataInfo *data =
408     new IdDataInfo(items->at(0).data_ptr, items->at(0).data_len, items, curr_row.eoe(), curr_row.eof());
409 
410   MS_EXCEPTION_IF_NULL(data);
411   id_data_queue->Push(data);
412 
413   if (create_data_info_queue_) {
414     DATA_INFO data_info;
415     (void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
416                          [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
417     RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
418   }
419 #endif
420   return Status::OK();
421 }
422 
WaitForAscendQueue(size_t batch_data_len)423 Status DataQueueOp::WaitForAscendQueue(size_t batch_data_len) {
424   // Queue control logic for mbuf in host, to prevent from hang/exit abnormally
425   // case 1: If mbuf queue memory + next row memory < 2G then continue send, else suspend;
426   // case 2: Based on case 1, if element nums in mbuf < max_queue_size then continue send, else suspend;
427   // case 3: If row memory >= 1G, can only send 1 row each time, queue_size will always in [0, 1];
428   // note:
429   // why need queue control: acltdtSendTensor will hang when queue is full, need to break this thread by ourselves
430   // how about dynamic shape: yes, memory_per_batch_ collect memory of rows in different shapes.
431   // how about row too large(>2G): we can promise the first row will be sent and hang in this while, but we dont
432   //     know if the device will out of memory. If not oom, send next row, otherwise device returns error.
433 
434   // Calculate the memory of next row before sending
435   size_t queue_size = ascend_data_queue_->QueryQueueSize();
436   double row_memory = batch_data_len / 1024. / 1024. / 1024.;
437   memory_per_batch_.push_back(row_memory);
438 
439   const double max_queue_memory = 2.;
440   const size_t max_queue_size = 100;
441   const int64_t send_interval = 1000;
442   RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WaitForAscend"));
443   while ((row_memory + CalMbufQueueMemory(queue_size) >= max_queue_memory || queue_size >= max_queue_size) &&
444          queue_size != 0) {
445     RETURN_IF_INTERRUPTED();
446     MS_LOG(DEBUG) << "Mbuf queue size: " << queue_size << ", max queue limit: " << max_queue_size << ". "
447                   << "Next row memory: " << row_memory << ", Mbuf memory: " << CalMbufQueueMemory(queue_size);
448 
449     queue_size = ascend_data_queue_->QueryQueueSize();
450     std::this_thread::sleep_for(std::chrono::microseconds(send_interval));
451   }
452   RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WaitForAscend"));
453   return Status::OK();
454 }
455 
SendDataToAscend()456 Status DataQueueOp::SendDataToAscend() {
457   MS_LOG(INFO) << "Device queue, sending data to Ascend.";
458   if (enable_prefetch_cache_pipeline_) {
459     RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
460       "Push prefetch data to ascend queue", std::bind(&DataQueueOp::PushPrefetchDataToAscend, this), nullptr, id()));
461   }
462 #ifndef ENABLE_SECURITY
463   uint64_t batch_start_time = 0;
464   uint64_t end_time = 0;
465   uint64_t batch_record_start = 0;
466   uint64_t batch_record_end = 0;
467 #endif
468   int64_t send_batch = 0;
469   int32_t tdt_cost = 0;
470 #ifndef ENABLE_SECURITY
471   int32_t connector_size = 0;
472   int32_t connector_capacity = 0;
473 #endif
474   bool is_break_loop = false;
475 
476   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
477   int64_t sending_num = cfg->sending_batches();  // Get the current sending_num
478 
479 #ifndef ENABLE_SECURITY
480   std::shared_ptr<DeviceQueueTracing> profiling_node;
481   bool is_profiling_enable = GlobalContext::profiling_manager()->IsProfilingEnable(tree_);
482   if (is_profiling_enable) {
483     std::shared_ptr<Tracing> node;
484     RETURN_IF_NOT_OK(GlobalContext::profiling_manager()->GetTracingNode(kDeviceQueueTracingName, &node));
485     profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
486     batch_start_time = ProfilingTime::GetCurMilliSecond();
487     connector_capacity = ChildOpConnectorCapacity();
488   }
489 #else
490   bool is_profiling_enable = false;
491 #endif
492 #ifdef ENABLE_DUMP_IR
493   RETURN_IF_NOT_OK(md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()));
494   RETURN_IF_NOT_OK(md_channel_info_->RecordPreprocessBatch(0));
495 #endif
496 #ifndef ENABLE_SECURITY
497   batch_record_start = ProfilingTime::GetCurMilliSecond();
498 #endif
499   TensorRow curr_row;
500   RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
501   first_fetch_flag_ = true;
502 
503   MS_LOG(INFO) << "Begin to send data to device, channel name: " << channel_name_;
504 
505   while (!curr_row.eof() && !is_break_loop) {
506     SendInfo send_info_per_epoch;
507     send_info_per_epoch.epoch = op_current_epochs_ + 1;
508     send_summary_.push_back(send_info_per_epoch);
509     while (!curr_row.eoe() && !is_break_loop) {
510       RETURN_IF_NOT_OK(FilterMetadata(&curr_row));
511       RETURN_IF_NOT_OK(CheckExceptions(curr_row));
512       WaitContinueSignal();
513 #ifdef ENABLE_DUMP_IR
514       RETURN_IF_NOT_OK(md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()));
515       RETURN_IF_NOT_OK(md_channel_info_->RecordPreprocessBatch(send_batch));
516       RETURN_IF_NOT_OK(md_channel_info_->RecordPushStartTime());
517       if (send_batch == 0) {
518         // record the push start time for first batch
519         RETURN_IF_NOT_OK(md_channel_info_->RecordPushFirstStartTime());
520       }
521 #endif
522 #ifndef ENABLE_SECURITY
523       DetectPerBatchTime(&batch_record_start, &batch_record_end);
524 #endif
525       PrintBeginInfoWhenFirstBatch(first_push_flag_);
526       // when training stopped, handle might have been destroyed immediately
527       if (ascend_data_queue_ != nullptr && !ascend_data_queue_->IsOpen()) {
528         MS_LOG(WARNING) << "Thread has already been terminated.";
529         is_break_loop = true;
530         continue;
531       }
532       RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "PushToAscend"));
533       if (!enable_prefetch_cache_pipeline_) {
534 #ifdef ENABLE_DUMP_IR
535         if (ascend_data_queue_->QueueType() == "Ascend_MBUF") {
536           RETURN_IF_NOT_OK(md_channel_info_->RecordDeviceQueue(ascend_data_queue_->QueryQueueSize()));
537         }
538         MS_LOG(INFO) << md_channel_info_->ToFormatString();
539 #endif
540         RETURN_IF_NOT_OK(SendRowToTdt(curr_row, is_profiling_enable, &tdt_cost));
541       } else {
542         RETURN_IF_NOT_OK(PushDataToAscendCacheQueue(curr_row));
543       }
544 #ifdef ENABLE_DUMP_IR
545       if (send_batch == 0) {
546         // record the push end time for first batch
547         RETURN_IF_NOT_OK(md_channel_info_->RecordPushFirstEndTime());
548       }
549 #endif
550       RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "PushToAscend", {{"TensorRowFlags", curr_row.FlagName()}}));
551       PrintEndInfoWhenFirstBatch(&first_push_flag_);
552 #ifndef ENABLE_SECURITY
553       ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
554                         connector_capacity, connector_size);
555       batch_record_start = ProfilingTime::GetCurMilliSecond();
556 #endif
557       send_batch++;
558       MS_LOG(INFO) << "Have sent " << send_batch << " batch(es) to device, channel name: " << channel_name_;
559 #ifdef ENABLE_DUMP_IR
560       RETURN_IF_NOT_OK(md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()));
561       RETURN_IF_NOT_OK(md_channel_info_->RecordPreprocessBatch(send_batch));
562       RETURN_IF_NOT_OK(md_channel_info_->RecordPushEndTime());
563 #endif
564 
565       if (total_batch_ > 0 && send_batch >= total_batch_) {
566         is_break_loop = true;
567         break;
568       }
569 
570       // wait when sending num is not 0, and sending num no larger than already sending batch
571       LimitSendingBatches(send_batch, &sending_num, cfg);
572 
573 #ifndef ENABLE_SECURITY
574       RecordProfilingData(is_profiling_enable, false, &connector_size, &connector_capacity, &send_batch);
575 #endif
576       RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
577 #ifndef ENABLE_SECURITY
578       uint64_t batch_fetch_end = ProfilingTime::GetCurMilliSecond();
579 #endif
580       if (ascend_data_queue_->QueueType() == "Ascend_MBUF") {
581         if (!enable_prefetch_cache_pipeline_) {
582           RETURN_IF_NOT_OK(WaitForAscendQueue(static_cast<size_t>(curr_row.SizeInBytes())));
583         }
584       }
585 #ifndef ENABLE_SECURITY
586       uint64_t queue_wait_end = ProfilingTime::GetCurMilliSecond();
587       // Skip the time looping in the mbuf queue control, FetchNextTensorRow time is what we need
588       batch_record_start = batch_record_start + (queue_wait_end - batch_fetch_end);
589 #endif
590     }
591 
592     RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "PushToAscend"));
593     // send epoch end flag: ACL_TENSOR_DATA_END_OF_SEQUENCE to tdt
594     RETURN_IF_NOT_OK(SendEpochEndToAscend(curr_row, is_profiling_enable, &tdt_cost, &is_break_loop));
595     RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "PushToAscend",
596                                       {{"TensorRowFlags", TensorRow(TensorRow::kFlagEOE).FlagName()}}));
597     UpdateRepeatAndEpochCounter();
598 #ifndef ENABLE_SECURITY
599     RecordProfilingData(is_profiling_enable, true, &connector_size, &connector_capacity, &send_batch);
600 #endif
601     RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
602   }
603 
604   if (enable_prefetch_cache_pipeline_) {
605     PushFileEndToQueue(channel_name_);
606   }
607 
608   // now we use this flag to judge whether exception raised.
609   if (stop_send_ || !TaskManager::FindMe()->Interrupted()) {
610     send_finished_ = true;
611   }
612   tree_->SetFinished();
613   MS_LOG(INFO) << "ExecutionTree finished. Device queue sent number of batches: " << send_batch;
614 
615   return Status::OK();
616 }
617 
RecordProfilingData(bool is_profiling_enable,bool end_of_epoch,int32_t * connector_size,int32_t * connector_capacity,const int64_t * send_batch) const618 void DataQueueOp::RecordProfilingData(bool is_profiling_enable, bool end_of_epoch, int32_t *connector_size,
619                                       int32_t *connector_capacity, const int64_t *send_batch) const {
620   if (is_profiling_enable) {
621     *connector_size = ChildOpConnectorSize();
622     *connector_capacity = ChildOpConnectorCapacity();
623   }
624   if (end_of_epoch) {
625     tree_->SetEpochEnd();
626     GlobalContext::profiling_manager()->RecordEndOfEpoch(*send_batch);
627   }
628 }
629 
CalMbufQueueMemory(size_t realtime_queue_size)630 double DataQueueOp::CalMbufQueueMemory(size_t realtime_queue_size) {
631   while (memory_per_batch_.size() > realtime_queue_size) {
632     memory_per_batch_.pop_front();
633   }
634   return std::accumulate(memory_per_batch_.begin(), memory_per_batch_.end(), 0.);
635 }
636 
SendEpochEndToAscend(const TensorRow & curr_row,const bool & is_profiling_enable,int32_t * tdt_cost,bool * is_break_loop)637 Status DataQueueOp::SendEpochEndToAscend(const TensorRow &curr_row, const bool &is_profiling_enable, int32_t *tdt_cost,
638                                          bool *is_break_loop) {
639   RETURN_UNEXPECTED_IF_NULL(tdt_cost);
640   RETURN_UNEXPECTED_IF_NULL(is_break_loop);
641   if (curr_row.eoe() && send_epoch_end_ && ascend_data_queue_->IsOpen()) {
642     TensorRow dummy_row;
643     if (!enable_prefetch_cache_pipeline_) {
644 #ifndef ENABLE_SECURITY
645       double start_time = 0;
646       if (is_profiling_enable) {
647         start_time = ProfilingTime::GetCurMilliSecond();
648       }
649 #endif
650       auto status = ascend_data_queue_->Push({});
651 #ifndef ENABLE_SECURITY
652       if (is_profiling_enable) {
653         double end_time = ProfilingTime::GetCurMilliSecond();
654         RETURN_UNEXPECTED_IF_NULL(tdt_cost);
655         *tdt_cost = static_cast<int32_t>(end_time - start_time);
656       }
657 #endif
658 
659       RETURN_IF_NOT_OK(CheckPushStatus(status, stop_send_, &send_finished_, is_break_loop));
660       MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
661     } else {
662       PushEpochEndToQueue(channel_name_);
663     }
664     stop_send_ = true;
665   }
666   return Status::OK();
667 }
668 
WaitContinueSignal() const669 void DataQueueOp::WaitContinueSignal() const {
670   while (stop_send_ && ascend_keep_waiting_) {
671     MS_LOG(DEBUG) << "stop_send flag is set, waiting for continue signal...";
672     std::this_thread::sleep_for(std::chrono::microseconds(100));
673   }
674 }
675 
LimitSendingBatches(int64_t send_batch,int64_t * sending_num,const std::shared_ptr<ConfigManager> & cfg) const676 void DataQueueOp::LimitSendingBatches(int64_t send_batch, int64_t *sending_num,
677                                       const std::shared_ptr<ConfigManager> &cfg) const {
678   while (send_batch >= *sending_num) {
679     *sending_num = cfg->sending_batches();
680     if (*sending_num == 0) {
681       break;
682     }
683     std::this_thread::sleep_for(std::chrono::milliseconds(10));
684     MS_LOG(INFO) << "Wait for 10 milliseconds, as needed send batch is: " << *sending_num
685                  << ", and current sending batch is:" << send_batch;
686   }
687 }
688 
SendRowToTdt(TensorRow curr_row,bool is_profiling_enable,int32_t * tdt_cost)689 Status DataQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost) {
690   std::vector<device::DataQueueItem> items = ConvertTensorRowToDataQueueItem(curr_row);
691 #ifndef ENABLE_SECURITY
692   double start_time = 0;
693   if (is_profiling_enable) {
694     start_time = ProfilingTime::GetCurMilliSecond();
695   }
696 #endif
697   auto status = ascend_data_queue_->Push(items);
698 #ifndef ENABLE_SECURITY
699   if (is_profiling_enable) {
700     double end_time = ProfilingTime::GetCurMilliSecond();
701     RETURN_UNEXPECTED_IF_NULL(tdt_cost);
702     *tdt_cost = static_cast<int32_t>(end_time - start_time);
703   }
704 #endif
705   if (status != device::DataQueueStatus::SUCCESS) {
706     if (stop_send_) {
707       MS_LOG(INFO) << "stop_send received";
708       return Status::OK();
709     }
710     RETURN_STATUS_ERROR(
711       StatusCode::kMDTDTPushFailure,
712       "TDT Push data into device Failed, check the first error or TraceBack first, more checking advises are: "
713       "1) if training is not ready, error might raised by network computing operator or environment configuration. "
714       "2) other cases, checking info level log or search this error in mindspore's FAQ for detail solution.");
715   }
716   if (create_data_info_queue_) {
717     DATA_INFO data_info;
718     (void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
719                          [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
720     RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(std::move(data_info)));
721   }
722   return Status::OK();
723 }
724 
CheckPushStatus(DataQueueStatus status,bool stop_send,bool * send_finished,bool * is_break_loop)725 Status DataQueueOp::CheckPushStatus(DataQueueStatus status, bool stop_send, bool *send_finished, bool *is_break_loop) {
726   if (status != DataQueueStatus::SUCCESS) {
727     if (stop_send) {
728       *send_finished = true;
729       MS_LOG(INFO) << "stop_send received";
730       return Status::OK();
731     }
732     // when training stopped, handle might have been destroyed immediately
733     if (!ascend_data_queue_->IsOpen()) {
734       *is_break_loop = true;
735       MS_LOG(WARNING) << "Thread has already been terminated.";
736       return Status::OK();
737     }
738     RETURN_STATUS_ERROR(
739       StatusCode::kMDTDTPushFailure,
740       "TDT Push data into device Failed, check the first error or TraceBack first, more checking advises are: "
741       "1) if training is not ready, error might raised by network computing operator or environment configuration. "
742       "2) other cases, checking info level log or search this error in mindspore's FAQ for detail solution.");
743   }
744   return Status::OK();
745 }
746 
GetDataInfo(DATA_INFO * data_info)747 Status DataQueueOp::GetDataInfo(DATA_INFO *data_info) {
748 #ifdef WITH_BACKEND
749   RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
750   if (!create_data_info_queue_) {
751     RETURN_STATUS_UNEXPECTED("[Internal ERROR] DataInfo queue is not created.");
752   }
753   // This place has a race condition with operator(), so the first one
754   // arrive here will do the initialize work.
755   {
756     std::unique_lock<std::mutex> lock(data_info_mutex_);
757     if (data_info_queue_ptr_ == nullptr) {
758       data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
759       RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
760     }
761   }
762   RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info));
763 #endif
764   return Status::OK();
765 }
766 
GetMbufQueueSize(size_t * queue_size)767 Status DataQueueOp::GetMbufQueueSize(size_t *queue_size) {
768 #ifdef WITH_BACKEND
769   if (TaskManager::FindMe()->Interrupted()) {
770     RETURN_STATUS_ERROR(StatusCode::kMDTDTPushFailure,
771                         "DataQueueOp thread had been interrupted, please check for other error messages.");
772   }
773   if (device_type_ == DeviceType::Ascend && ascend_data_queue_->QueueType() == "Ascend_MBUF") {
774     *queue_size = ascend_data_queue_->QueryQueueSize();
775   } else {
776     *queue_size = 1;
777   }
778 #endif
779   return Status::OK();
780 }
781 
GetSendInfo()782 std::vector<std::vector<double>> DataQueueOp::GetSendInfo() {
783   std::vector<std::vector<double>> send_info_per_epoch;
784   (void)std::transform(send_summary_.begin(), send_summary_.end(), std::back_inserter(send_info_per_epoch),
785                        [](const SendInfo &send_info) -> std::vector<double> {
786                          return std::vector<double>{send_info.epoch, send_info.fetch_data_num,
787                                                     send_info.first_data_time, send_info.fetch_data_time};
788                        });
789   // history message control
790   constexpr auto kMaxInfoSize = 5;
791   while (send_info_per_epoch.size() > kMaxInfoSize) {
792     send_info_per_epoch.erase(send_info_per_epoch.begin());
793   }
794   return send_info_per_epoch;
795 }
796 
SetThreadDevice()797 Status DataQueueOp::SetThreadDevice() {
798 #ifdef WITH_BACKEND
799   (void)device::DataQueueMgr::GetInstance().SetThreadDevice(channel_name_);
800 #endif
801   return Status::OK();
802 }
803 
CreateDynamicDataQueue()804 Status DataQueueOp::CreateDynamicDataQueue() {
805 #ifdef WITH_BACKEND
806   if (dynamic_shape_) {
807     auto ret = device::DataQueueMgr::GetInstance().CreateDynamicBufQueue(channel_name_, kDynamicHostQueueCapacity);
808     if (ret != DataQueueStatus::SUCCESS && ret != DataQueueStatus::QUEUE_EXIST) {
809       RETURN_STATUS_ERROR(StatusCode::kMEFailed, "Create dynamic data queue failed");
810     }
811   }
812 #endif
813   return Status::OK();
814 }
815 
LaunchParallelCopyThread()816 Status DataQueueOp::LaunchParallelCopyThread() {
817 #ifdef WITH_BACKEND
818   RETURN_UNEXPECTED_IF_NULL(tree_);
819   // Every thread use cuda api should SetThreadDevice
820   RETURN_IF_NOT_OK(SetThreadDevice());
821   // CircularPool may not safe under multi-threads scenario, so one worker with one pool
822   for (int i = 0; i < num_workers_; i++) {
823     std::shared_ptr<MemoryPool> pool;
824     RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
825     RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(
826       &pool, -1, kDeviceQueGpuThreadMemory, false,
827       MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kGPUDevice));
828     pool_.push_back(pool);
829   }
830   gpu_connector_ = std::make_unique<GpuConnector>(num_workers_, 1, queue_capacity_);
831   receive_queues_.Init(num_workers_, queue_capacity_);
832   RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks()));
833   RETURN_IF_NOT_OK(tree_->LaunchWorkers(static_cast<int>(num_workers_),
834                                         std::bind(&DataQueueOp::WorkerEntry, this, std::placeholders::_1),
835                                         Name() + "::WorkerEntry", id()));
836   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue",
837                                                       std::bind(&DataQueueOp::PushDataToGPU, this), nullptr, id()));
838   if (enable_prefetch_cache_pipeline_) {
839     RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
840       "Push prefetch data to GPU queue", std::bind(&DataQueueOp::PushPrefetchDataToGPU, this), nullptr, id()));
841   }
842 
843 #endif
844   return Status::OK();
845 }
846 
NoExceptionRaised() const847 bool DataQueueOp::NoExceptionRaised() const {
848 #ifdef WITH_BACKEND
849   return !TaskManager::FindMe()->Interrupted() && !device::DataQueueMgr::GetInstance().IsClosed();
850 #else
851   return !TaskManager::FindMe()->Interrupted();
852 #endif
853 }
854 
PushDataToGPU()855 Status DataQueueOp::PushDataToGPU() {
856 #ifdef WITH_BACKEND
857   RETURN_UNEXPECTED_IF_NULL(tree_);
858   // Every thread use cuda api should SetThreadDevice
859   RETURN_IF_NOT_OK(SetThreadDevice());
860   TaskManager::FindMe()->Post();
861 #ifndef ENABLE_SECURITY
862   uint64_t batch_start_time = 0;
863   uint64_t end_time = 0;
864   uint64_t push_cost = 0;
865   std::shared_ptr<DeviceQueueTracing> profiling_node;
866   bool is_profiling_enable = GlobalContext::profiling_manager()->IsProfilingEnable(tree_);
867   if (is_profiling_enable) {
868     std::shared_ptr<Tracing> node;
869     RETURN_IF_NOT_OK(GlobalContext::profiling_manager()->GetTracingNode(kDeviceQueueTracingName, &node));
870     profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
871     batch_start_time = ProfilingTime::GetCurMilliSecond();
872   }
873 #endif
874 #ifdef ENABLE_DUMP_IR
875   md_channel_info_->RecordBatchQueue(gpu_connector_->size());
876   md_channel_info_->RecordPreprocessBatch(0);
877 #endif
878   GpuConnectorItem item;
879   RETURN_IF_NOT_OK(gpu_connector_->Pop(0, &item));
880   auto items = std::move(item.data_item);
881   bool eoe_flag = item.eoe_flag;
882   int64_t send_batch = 0;
883   auto release_function = std::bind(&DataQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
884   auto ret = device::DataQueueMgr::GetInstance().Open(channel_name_, release_function);
885   if (ret != DataQueueStatus::SUCCESS) {
886     RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to open channel for sending data.");
887   }
888 
889   while (!(items.empty() && !eoe_flag) && !device::DataQueueMgr::GetInstance().IsClosed()) {
890     if (!eoe_flag) {
891 #ifdef ENABLE_DUMP_IR
892       md_channel_info_->RecordBatchQueue(gpu_connector_->size());
893       md_channel_info_->RecordPreprocessBatch(send_batch);
894       md_channel_info_->RecordPushStartTime();
895 #endif
896       // Data prefetch only when PS mode enables cache.
897       RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "PushToGPU"));
898       if (!enable_prefetch_cache_pipeline_) {
899         RETURN_IF_NOT_OK(RetryPushData(items, is_profiling_enable, &push_cost));
900       } else {
901         PushDataToGPUCacheQueue(std::move(items));
902       }
903       RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "PushToGPU", {{"TensorRowFlags", "Data"}}));
904 #ifndef ENABLE_SECURITY
905       ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, push_cost, &batch_start_time, &end_time,
906                         gpu_connector_->capacity(), gpu_connector_->size());
907 #endif
908       send_batch++;
909       MS_LOG(INFO) << "Have sent " << send_batch << " batch(es) to device, channel name: " << channel_name_;
910 #ifdef ENABLE_DUMP_IR
911       md_channel_info_->RecordBatchQueue(gpu_connector_->size());
912       md_channel_info_->RecordPreprocessBatch(send_batch);
913       md_channel_info_->RecordPushEndTime();
914 #endif
915       if (total_batch_ > 0 && send_batch >= total_batch_) {
916         break;
917       }
918     } else {
919 #ifndef ENABLE_SECURITY
920       if (is_profiling_enable) {
921         tree_->SetEpochEnd();
922         GlobalContext::profiling_manager()->RecordEndOfEpoch(send_batch);
923       }
924 #endif
925     }
926     if (NoExceptionRaised()) {
927       auto rc = gpu_connector_->Pop(0, &item);
928       items = std::move(item.data_item);
929       eoe_flag = item.eoe_flag;
930       // If the batches send by dataset are more than gpu calculate, gpu will core for no signal notify.
931       if (rc.IsError()) {
932         device::DataQueueMgr::GetInstance().Close(channel_name_);
933         device::DataQueueMgr::GetInstance().CloseConfirm();
934         return rc;
935       }
936     } else {
937       break;
938     }
939   }
940 
941   if (enable_prefetch_cache_pipeline_) {
942     // Send eof to cache queue
943     PushFileEndToQueue(channel_name_);
944   }
945 
946   // now we use this flag to judge whether exception raised.
947   if (NoExceptionRaised()) {
948     send_finished_ = true;
949   }
950   tree_->SetFinished();
951   MS_LOG(INFO) << "ExecutionTree finished. Device queue sent number of batches: " << send_batch;
952 
953   device::DataQueueMgr::GetInstance().Close(channel_name_);
954   device::DataQueueMgr::GetInstance().CloseConfirm();
955   return Status::OK();
956 }
957 
958 // WorkEntry of DataQueueOp just do multi_threads memcpy for performance optimization.
WorkerEntry(int32_t worker_id)959 Status DataQueueOp::WorkerEntry(int32_t worker_id) {
960   // Every thread use cuda api should SetThreadDevice
961   RETURN_IF_NOT_OK(SetThreadDevice());
962   TaskManager::FindMe()->Post();
963   TensorRow current_row;
964   uint32_t batch_num = 0;
965   RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_row));
966   while (!current_row.quit() && !device::DataQueueMgr::GetInstance().IsClosed()) {
967     GpuConnectorItem connector_item = {{}, current_row.eoe()};
968     if (!connector_item.eoe_flag) {
969       std::vector<device::DataQueueItem> items;
970       for (auto &i : current_row) {
971         device::DataQueueItem data_item;
972         data_item.data_len = static_cast<size_t>(i->SizeInBytes());
973         data_item.shapes = i->shape().AsVector();
974         data_item.data_ptr = nullptr;
975         data_item.worker_id = worker_id;
976         items.push_back(data_item);
977       }
978 
979       RETURN_IF_NOT_OK(MallocForGPUData(&items, current_row, worker_id));
980       connector_item.data_item = std::move(items);
981       batch_num++;
982     } else {
983       MS_LOG(INFO) << "EOE Detected";
984     }
985     RETURN_IF_NOT_OK(gpu_connector_->Add(worker_id, std::move(connector_item)));
986     RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_row));
987   }
988 
989   MS_LOG(INFO) << "Device queue worker id " << worker_id << " processed number of batches: " << batch_num;
990   // Add empty data_item vector with eoe_flag=false as quit flag.
991   GpuConnectorItem connector_item = {{}, false};
992   RETURN_IF_NOT_OK(gpu_connector_->Add(worker_id, std::move(connector_item)));
993 #endif
994   return Status::OK();
995 }
996 
SendDataToGPU()997 Status DataQueueOp::SendDataToGPU() {
998 #ifdef WITH_BACKEND
999   RETURN_IF_NOT_OK(LaunchParallelCopyThread());
1000   MS_LOG(INFO) << "Device queue, sending data to GPU.";
1001 #ifndef ENABLE_SECURITY
1002   uint64_t batch_record_start;
1003   uint64_t batch_record_end;
1004   batch_record_start = ProfilingTime::GetCurMilliSecond();
1005 #endif
1006   TensorRow current_row;
1007   RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
1008   first_fetch_flag_ = true;
1009   int64_t num_buf = 0;
1010   bool is_break_loop = false;
1011 
1012   MS_LOG(INFO) << "Begin to send data to device, channel name: " << channel_name_;
1013 
1014   while (!current_row.eof() && !is_break_loop && !device::DataQueueMgr::GetInstance().IsClosed()) {
1015     SendInfo send_info_per_epoch;
1016     send_info_per_epoch.epoch = op_current_epochs_ + 1;
1017     send_summary_.push_back(send_info_per_epoch);
1018     while (!current_row.eoe() && !is_break_loop && !device::DataQueueMgr::GetInstance().IsClosed()) {
1019       RETURN_IF_NOT_OK(FilterMetadata(&current_row));
1020       RETURN_IF_NOT_OK(CheckExceptions(current_row));
1021 #ifndef ENABLE_SECURITY
1022       DetectPerBatchTime(&batch_record_start, &batch_record_end);
1023 #endif
1024 
1025       if (create_data_info_queue_) {
1026         DATA_INFO data_info;
1027         (void)std::transform(current_row.begin(), current_row.end(), std::back_inserter(data_info),
1028                              [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
1029         RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
1030       }
1031 
1032       PrintBeginInfoWhenFirstBatch(first_push_flag_);
1033       RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_row)));
1034       PrintEndInfoWhenFirstBatch(&first_push_flag_);
1035 #ifndef ENABLE_SECURITY
1036       batch_record_start = ProfilingTime::GetCurMilliSecond();
1037 #endif
1038       if (NoExceptionRaised()) {
1039         RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
1040       } else {
1041         is_break_loop = true;
1042       }
1043     }
1044     UpdateRepeatAndEpochCounter();
1045     if (current_row.eoe()) {
1046       MS_LOG(INFO) << "EOE Detected";
1047       TensorRow eoe_flag(TensorRow::kFlagEOE);
1048       RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(eoe_flag)));
1049     }
1050 
1051     if (NoExceptionRaised()) {
1052       RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
1053     } else {
1054       is_break_loop = true;
1055     }
1056   }
1057 
1058   for (uint32_t index = 0; index < num_workers_; index++) {
1059     MS_LOG(INFO) << "Adding quit flag to Workers";
1060     TensorRow quit_flag(TensorRow::kFlagQuit);
1061     RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit_flag)));
1062   }
1063 
1064   MS_LOG(INFO) << "Device queue received number of batches and EOEs: " << (num_buf - num_workers_);
1065 #else
1066   MS_LOG(WARNING) << "Gpu queue is not supported in ut tests.";
1067 #endif
1068   return Status::OK();
1069 }
1070 
MallocForGPUData(std::vector<device::DataQueueItem> * items,const TensorRow & curr_row,const int32_t & worker_id)1071 Status DataQueueOp::MallocForGPUData(std::vector<device::DataQueueItem> *items, const TensorRow &curr_row,
1072                                      const int32_t &worker_id) {
1073   size_t i = 0;
1074   for (auto &sub_item : *items) {
1075     auto rc = pool_[static_cast<size_t>(worker_id)]->Allocate(sub_item.data_len, &sub_item.data_ptr);
1076     if (rc.IsError() || sub_item.data_ptr == nullptr) {
1077       RETURN_STATUS_OOM("Memory malloc failed, check memory usage.");
1078     }
1079     if (curr_row[i] == nullptr) {
1080       MS_LOG(ERROR) << "[Internal ERROR] The pointer curr_row[" << i << "] is null";
1081       RETURN_STATUS_UNEXPECTED("[Internal ERROR] TensorRow 'curr_row' contains nullptr.");
1082     }
1083     sub_item.data_type = curr_row[i]->type().ToString();
1084     const unsigned char *column_data = curr_row[i]->GetBuffer();
1085     if (memcpy_s(sub_item.data_ptr, sub_item.data_len, column_data,
1086                  static_cast<uint32_t>(curr_row[i++]->SizeInBytes())) != 0) {
1087       MS_LOG(ERROR) << "[Internal ERROR] memcpy_s failed.";
1088       RETURN_STATUS_UNEXPECTED("[Internal ERROR] memcpy_s failed.");
1089     }
1090   }
1091 
1092   return Status::OK();
1093 }
1094 
ClearDevice()1095 Status DataQueueOp::ClearDevice() {
1096 #ifdef WITH_BACKEND
1097   MS_LOG(INFO) << "Clearing the data in GPU device: " << device_id_ << " channel: " << channel_name_;
1098   auto release_function = std::bind(&DataQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
1099   auto ret = device::DataQueueMgr::GetInstance().Open(channel_name_, release_function);
1100   if (ret != DataQueueStatus::SUCCESS) {
1101     RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to open channel for clearing the device.");
1102   }
1103 
1104   ret = device::DataQueueMgr::GetInstance().Clear(channel_name_);
1105   CHECK_FAIL_RETURN_UNEXPECTED(ret == DataQueueStatus::SUCCESS, "Failed to clear the device.");
1106   device::DataQueueMgr::GetInstance().Close(channel_name_);
1107   device::DataQueueMgr::GetInstance().CloseConfirm();
1108 #endif
1109   return Status::OK();
1110 }
1111 
SendDataToCPU()1112 Status DataQueueOp::SendDataToCPU() {
1113   MS_LOG(INFO) << "Device queue, sending data to CPU.";
1114   int64_t total_batch = 0;
1115 
1116   while (!(child_iterator_->EofHandled())) {
1117     TensorRow curr_row;
1118     RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
1119 
1120     if (!first_fetch_flag_) {
1121       first_fetch_flag_ = true;
1122     }
1123     if (!curr_row.empty()) {
1124       for (auto &tensor : curr_row) {
1125         MS_LOG(DEBUG) << "Feature size is " << tensor->SizeInBytes() << ".";
1126       }
1127       total_batch++;
1128       if (stop_send_) {
1129         break;
1130       }
1131     }
1132   }
1133 
1134   MS_LOG(INFO) << "Device queue total batch is " << total_batch << ".";
1135 
1136   return Status::OK();
1137 }
1138 
Print(std::ostream & out,bool show_all) const1139 void DataQueueOp::Print(std::ostream &out, bool show_all) const {
1140   if (!show_all) {
1141     // Call the super class for displaying any common 1-liner info
1142     PipelineOp::Print(out, show_all);
1143     // Then show any custom derived-internal 1-liner info for this op
1144     out << "\n";
1145   } else {
1146     // Call the super class for displaying any common detailed info
1147     PipelineOp::Print(out, show_all);
1148     // Then show any custom derived-internal stuff
1149     out << "\nChannel name: " << channel_name_ << "\n\n";
1150   }
1151 }
1152 
1153 #ifndef ENABLE_SECURITY
ProfilingRecorder(bool is_profiling_enable,const std::shared_ptr<DeviceQueueTracing> & profiling_node,int64_t send_batch,int32_t tdt_cost,uint64_t * batch_start_time,uint64_t * end_time,int32_t connector_capacity,int32_t connector_size) const1154 void DataQueueOp::ProfilingRecorder(bool is_profiling_enable, const std::shared_ptr<DeviceQueueTracing> &profiling_node,
1155                                     int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time,
1156                                     uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) const {
1157   // Record the pipeline profiling info
1158   if (is_profiling_enable) {
1159     *end_time = ProfilingTime::GetCurMilliSecond();
1160     // record push tdt time
1161     profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch + 1, tdt_cost, *end_time);
1162     int32_t batch_cost = (int32_t)(*end_time - *batch_start_time);
1163     // record batch time
1164     profiling_node->Record(TIME, BATCH_TIME, send_batch + 1, batch_cost, *end_time);
1165     // record pipeline time
1166     profiling_node->Record(TIME, PIPELINE_TIME, send_batch + 1, batch_cost - tdt_cost, *end_time);
1167     *batch_start_time = *end_time;
1168     // record connector depth
1169     profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size, *end_time);
1170   }
1171 }
1172 
DetectFirstBatch()1173 Status DataQueueOp::DetectFirstBatch() {
1174   TaskManager::FindMe()->Post();
1175   uint8_t count_num = 0;
1176   uint64_t temp_start_time = ProfilingTime::GetCurMilliSecond();
1177   constexpr int check_interval = 200;
1178   while (true) {
1179     RETURN_IF_INTERRUPTED();
1180     std::this_thread::sleep_for(std::chrono::milliseconds(check_interval));
1181     uint64_t temp_end_time = ProfilingTime::GetCurMilliSecond();
1182     // if fetch first batch, or detect 3 or more times and unable fetch first batch, exist with already printed Warning
1183     if (first_fetch_flag_ == true || count_num > 2) {
1184       break;
1185     } else if (temp_end_time - temp_start_time > kTimeOutMilliSeconds) {
1186       count_num++;
1187       MS_LOG(WARNING) << "Bad performance attention, it waits more than " +
1188                            std::to_string(kTimeOutMilliSeconds / 1000) +
1189                            " seconds and unable to fetch first Batch of "
1190                            "data from dataset pipeline, which might result `GetNext` timeout problem. You may test "
1191                            "dataset processing performance (with creating dataset iterator) and optimize it. Notes: "
1192                            "shuffle operation is turn on for loading Dataset in default, which may effect first batch "
1193                            "loading time.";
1194     }
1195   }
1196   return Status::OK();
1197 }
1198 
DetectPerBatchTime(const uint64_t * start_time,uint64_t * end_time)1199 void DataQueueOp::DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time) {
1200   *end_time = ProfilingTime::GetCurMilliSecond();
1201   auto interval = *end_time - *start_time;
1202   constexpr auto kTimeMilliSeconds = 1000.;
1203   send_summary_.back().record_data(interval / kTimeMilliSeconds);
1204   if (interval > kTimeOutMilliSeconds) {
1205     MS_LOG(WARNING) << "Bad performance attention, it takes more than " + std::to_string(kTimeOutMilliSeconds / 1000) +
1206                          " seconds to fetch a batch of data from dataset "
1207                          "pipeline, which might result `GetNext` timeout problem. You may test dataset processing"
1208                          " performance(with creating dataset iterator) and optimize it.";
1209   }
1210 }
1211 #endif
1212 
PrintBeginInfoWhenFirstBatch(const bool & first_push_flag) const1213 void DataQueueOp::PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) const {
1214   if (first_push_flag != true) {
1215     MS_LOG(INFO) << "Loading dataset and begin to push first batch into device ...";
1216   }
1217 }
1218 
PrintEndInfoWhenFirstBatch(bool * first_push_flag) const1219 void DataQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) const {
1220   if (!first_push_flag) {
1221     MS_LOG(WARNING) << "First batch flag: first_push_flag is nullptr";
1222     return;
1223   }
1224   if (*first_push_flag != true) {
1225     MS_LOG(INFO) << "Loading dataset and push first batch into device successful.";
1226     *first_push_flag = true;
1227   }
1228 }
1229 
RetryPushData(const std::vector<DataQueueItem> & items,const bool profiling,uint64_t * push_time)1230 Status DataQueueOp::RetryPushData(const std::vector<DataQueueItem> &items, const bool profiling, uint64_t *push_time) {
1231 #ifdef WITH_BACKEND
1232   bool flag_log = false;
1233 #ifndef ENABLE_SECURITY
1234   uint64_t start_time = 0;
1235   if (profiling) {
1236     start_time = ProfilingTime::GetCurMilliSecond();
1237   }
1238 #endif
1239   while (!device::DataQueueMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) {
1240     DataQueueStatus ret = device::DataQueueMgr::GetInstance().Push(channel_name_, items, WAIT_TIME);
1241     if (ret != DataQueueStatus::SUCCESS) {
1242       if (ret == DataQueueStatus::ERROR_INPUT) {
1243         device::DataQueueMgr::GetInstance().Close(channel_name_);
1244         device::DataQueueMgr::GetInstance().CloseConfirm();
1245         return Status(
1246           StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
1247           "Invalid data, the types or shapes of current row is different with previous row(i.e. do batch operation but "
1248           "drop_reminder is False, or without resize image into the same size, these will cause shapes differs).");
1249       } else {
1250         if (!stop_send_) {
1251           if (!flag_log) {
1252             MS_LOG(DEBUG) << "Retry pushing data...";
1253             flag_log = true;
1254           }
1255           continue;
1256         }
1257         break;
1258       }
1259     } else {
1260       break;
1261     }
1262   }
1263 #ifndef ENABLE_SECURITY
1264   if (profiling) {
1265     *push_time = ProfilingTime::GetCurMilliSecond() - start_time;
1266   }
1267 #endif
1268 #endif
1269   return Status::OK();
1270 }
1271 
SendDataToAscendDynamic()1272 Status DataQueueOp::SendDataToAscendDynamic() {
1273 #ifdef WITH_BACKEND
1274   MS_LOG(DEBUG) << "Dynamic Data queue, sending data to Ascend.";
1275   int64_t send_batch = 0;
1276   uint64_t data_queue_cost = 0;
1277   bool is_break_loop = false;
1278 
1279   RETURN_IF_NOT_OK(CreateDynamicDataQueue());
1280   std::function<void(void *, int32_t)> release_function([](void *, int32_t) { return; });
1281   auto ret = device::DataQueueMgr::GetInstance().Open(channel_name_, release_function);
1282   if (ret != DataQueueStatus::SUCCESS) {
1283     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
1284                   "[Internal ERROR] Failed to open channel for sending data.");
1285   }
1286 
1287   TensorRow curr_row;
1288   RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
1289   first_fetch_flag_ = true;
1290 
1291   while (!curr_row.eof() && !is_break_loop) {
1292     while (!curr_row.eoe() && !is_break_loop) {
1293       RETURN_IF_NOT_OK(FilterMetadata(&curr_row));
1294       RETURN_IF_NOT_OK(CheckExceptions(curr_row));
1295       std::vector<device::DataQueueItem> items = ConvertTensorRowToDataQueueItem(curr_row);
1296       RETURN_IF_NOT_OK(RetryPushData(items, false, &data_queue_cost));
1297       if (create_data_info_queue_) {
1298         DATA_INFO data_info;
1299         (void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
1300                              [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
1301         RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
1302       }
1303       send_batch++;
1304       if (total_batch_ > 0 && send_batch >= total_batch_) {
1305         is_break_loop = true;
1306         break;
1307       }
1308 
1309       RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
1310     }
1311 
1312     if (curr_row.eoe() && send_epoch_end_) {
1313       MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
1314       stop_send_ = true;
1315     }
1316 
1317     RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
1318   }
1319 
1320   // now we use this flag to judge whether exception raised.
1321   if (stop_send_ || !TaskManager::FindMe()->Interrupted()) {
1322     send_finished_ = true;
1323   }
1324   tree_->SetFinished();
1325   MS_LOG(INFO) << "ExecutionTree finished. Device queue sent number of batches: " << send_batch;
1326 
1327   device::DataQueueMgr::GetInstance().Close(channel_name_);
1328   device::DataQueueMgr::GetInstance().CloseConfirm();
1329 #endif
1330   return Status::OK();
1331 }
1332 }  // namespace dataset
1333 }  // namespace mindspore
1334