• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "minddata/dataset/engine/datasetops/device_queue_op.h"
18 
19 #include <algorithm>
20 #include <iostream>
21 #include <memory>
22 #include <unordered_map>
23 
24 #include "minddata/dataset/engine/dataset_iterator.h"
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/util/task_manager.h"
27 
28 namespace mindspore {
29 namespace dataset {
DeviceQueueOp(std::string channel_name,DeviceType device_type,int32_t device_id,int32_t prefetch_size,bool send_epoch_end,int32_t total_batch,bool create_data_info_queue)30 DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
31                              bool send_epoch_end, int32_t total_batch, bool create_data_info_queue)
32     : PipelineOp(1),
33       channel_name_(channel_name),
34       device_type_(device_type),
35       device_id_(device_id),
36       prefetch_size_(prefetch_size),
37       send_epoch_end_(send_epoch_end),
38       stop_send_(false),
39       send_finished_(false),
40       total_batch_(total_batch),
41       create_data_info_queue_(create_data_info_queue),
42       data_info_queue_ptr_(nullptr),
43       first_fetch_flag_(false),
44       first_push_flag_(false) {
45 #ifdef ENABLE_GPUQUE
46   // Get the total device num of current machine
47   int32_t device_count = 0;
48   cudaGetDeviceCount(&device_count);
49   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
50   rank_id_ = cfg->rank_id();  // Get the current rank_id
51   if (device_count > 0) {
52     rank_id_ = rank_id_ % device_count;
53   }
54   // Be careful when try to modified these num_workers_ and queue_capacity_,
55   // and we suggest num_workers_ * queue_capacity_ not greater than 16, because
56   // one worker one circular_pool with 1G pin memory, so num_workers_ * queue_capacity_
57   // must limit to avoid memory overload
58   num_workers_ = kDeviceQueGpuNumThreads;
59   queue_capacity_ = kDeviceQueGpuQueueCapacity;
60 #endif
61 #ifdef ENABLE_TDTQUE
62   ascend_keep_waiting_ = true;
63   tdtInstancePtr = std::make_shared<TdtPlugin>(channel_name_, device_id_);
64 #endif
65 #ifdef ENABLE_DUMP_IR
66   md_channel_info_ = std::make_shared<MDChannelInfo>(channel_name_);
67 #endif
68 }
69 
~DeviceQueueOp()70 DeviceQueueOp::~DeviceQueueOp() {
71 #ifdef ENABLE_DUMP_IR
72   std::string rdr_msg = md_channel_info_->ToString();
73   if (!send_finished_ && !rdr_msg.empty()) {
74     MS_LOG(WARNING) << rdr_msg;
75   }
76 #endif
77 }
78 
79 #ifdef ENABLE_GPUQUE
ReleaseData(void * addr,int32_t worker_id)80 void DeviceQueueOp::ReleaseData(void *addr, int32_t worker_id) {
81   if (addr != nullptr) {
82     pool_[worker_id]->Deallocate(addr);
83   }
84 }
85 #endif
86 
EoeReceived(int32_t worker_id)87 Status DeviceQueueOp::EoeReceived(int32_t worker_id) {
88   state_ = OpState::kDeOpIdle;
89   return Status::OK();
90 }
91 
FilterMetadata(TensorRow * row)92 Status DeviceQueueOp::FilterMetadata(TensorRow *row) {
93   std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
94   TensorRow output;
95   TensorRow tmp = *row;
96   std::vector<size_t> to_keep_indices;
97   for (auto column : current_name_id_map) {
98     std::string column_name = column.first;
99     // Need to filter meta column start with kDftMetaColumnPrefix
100     size_t pos = column_name.find(kDftMetaColumnPrefix);
101     if (pos != std::string::npos && pos == 0) {
102       continue;
103     }
104     to_keep_indices.push_back(column.second);
105   }
106   if (to_keep_indices.size() == 0) {
107     std::string err_msg = "No effective column found, maybe all columns are meta column and will be filtered. ";
108     err_msg += "If you want to output meta column please rename column name to a new one which is not start with ";
109     err_msg += "\"" + std::string(kDftMetaColumnPrefix) + "\"";
110     RETURN_STATUS_UNEXPECTED(err_msg);
111   }
112   std::sort(to_keep_indices.begin(), to_keep_indices.end());
113   (void)std::transform(to_keep_indices.begin(), to_keep_indices.end(), std::back_inserter(output),
114                        [&tmp](const auto &it) { return std::move(tmp[it]); });
115   *row = std::move(output);
116   return Status::OK();
117 }
118 
CheckExceptions(const TensorRow & row) const119 Status DeviceQueueOp::CheckExceptions(const TensorRow &row) const {
120   // this method checks if the row meets the conditions to be sent to TDT
121   for (const auto &item : row) {
122     CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Invalid data, cannot send string tensor to device.");
123     CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, cannot send tensor with no data to device.");
124   }
125   return Status::OK();
126 }
127 
operator ()()128 Status DeviceQueueOp::operator()() {
129 #ifndef ENABLE_SECURITY
130   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
131     "Detect first batch", std::bind(&DeviceQueueOp::DetectFirstBatch, this), nullptr, id()));
132 #endif
133   TaskManager::FindMe()->Post();
134   child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
135 
136 #ifdef ENABLE_DUMP_IR
137   if (md_channel_info_ == nullptr) {
138     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "[Internal ERROR] RDR module init failed.");
139   }
140 #endif
141   if (device_type_ == DeviceType::Ascend) {
142 #ifdef ENABLE_TDTQUE
143     if (create_data_info_queue_) {
144       // This place has a race condition with GetDataInfo, so the first one
145       // arrive here will do the initialize work.
146       {
147         std::unique_lock<std::mutex> lock(data_info_mutex_);
148         if (data_info_queue_ptr_ == nullptr) {
149           data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
150           RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
151         }
152       }
153     }
154     if (tdtInstancePtr->acl_handle_ == nullptr) {
155       RETURN_STATUS_UNEXPECTED("Create channel for sending data failed, please check DEVICE ID setting.");
156     }
157     RETURN_IF_NOT_OK(SendDataToAscend());
158 #endif
159   } else if (device_type_ == DeviceType::GPU) {
160 #ifdef ENABLE_GPUQUE
161     RETURN_IF_NOT_OK(SendDataToGPU());
162 #endif
163   } else if (device_type_ == DeviceType::CPU) {
164     RETURN_IF_NOT_OK(SendDataToCPU());
165   }
166 
167   return Status::OK();
168 }
169 
170 #ifdef ENABLE_TDTQUE
SendDataToAscend()171 Status DeviceQueueOp::SendDataToAscend() {
172   MS_LOG(INFO) << "Device queue, sending data to Ascend.";
173 #ifndef ENABLE_SECURITY
174   uint64_t batch_start_time = 0;
175   uint64_t end_time = 0;
176   uint64_t batch_record_start = 0;
177   uint64_t batch_record_end = 0;
178 #endif
179   int64_t send_batch = 0;
180   int32_t tdt_cost = 0;
181 #ifndef ENABLE_SECURITY
182   int32_t connector_size = 0;
183   int32_t connector_capacity = 0;
184 #endif
185   bool is_break_loop = false;
186 
187   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
188   int64_t sending_num = cfg->sending_batches();  // Get the current sending_num
189 
190 #ifndef ENABLE_SECURITY
191   std::shared_ptr<DeviceQueueTracing> profiling_node;
192   bool is_profiling_enable = tree_->GetProfilingManager()->IsProfilingEnable();
193   if (is_profiling_enable) {
194     std::shared_ptr<Tracing> node;
195     RETURN_IF_NOT_OK(tree_->GetProfilingManager()->GetTracingNode(kDeviceQueueTracingName, &node));
196     profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
197     batch_start_time = ProfilingTime::GetCurMilliSecond();
198     connector_capacity = ChildOpConnectorCapacity();
199   }
200 #else
201   bool is_profiling_enable = false;
202 #endif
203 #ifdef ENABLE_DUMP_IR
204   md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
205   md_channel_info_->RecordPreprocessBatch(0);
206 #endif
207 #ifndef ENABLE_SECURITY
208   batch_record_start = ProfilingTime::GetCurMilliSecond();
209 #endif
210   TensorRow curr_row;
211   RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
212   first_fetch_flag_ = true;
213   while (!curr_row.eof() && !is_break_loop) {
214     while (!curr_row.eoe() && !is_break_loop) {
215       RETURN_IF_NOT_OK(FilterMetadata(&curr_row));
216       RETURN_IF_NOT_OK(CheckExceptions(curr_row));
217       WaitContinueSignal();
218 #ifdef ENABLE_DUMP_IR
219       md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
220       md_channel_info_->RecordPreprocessBatch(send_batch);
221       md_channel_info_->RecordPushStartTime();
222 #endif
223 #ifndef ENABLE_SECURITY
224       DetectPerBatchTime(&batch_record_start, &batch_record_end);
225 #endif
226       PrintBeginInfoWhenFirstBatch(first_push_flag_);
227       RETURN_IF_NOT_OK(SendRowToTdt(curr_row, is_profiling_enable, &tdt_cost));
228       PrintEndInfoWhenFirstBatch(&first_push_flag_);
229 #ifndef ENABLE_SECURITY
230       ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
231                         connector_capacity, connector_size);
232       batch_record_start = ProfilingTime::GetCurMilliSecond();
233 #endif
234       send_batch++;
235 #ifdef ENABLE_DUMP_IR
236       md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
237       md_channel_info_->RecordPreprocessBatch(send_batch);
238       md_channel_info_->RecordPushEndTime();
239 #endif
240 
241       if (total_batch_ > 0 && send_batch >= total_batch_) {
242         is_break_loop = true;
243         break;
244       }
245 
246       // wait when sending num is not 0, and sending num no larger than already sending batch
247       LimitSendingBatches(send_batch, &sending_num, cfg);
248 
249 #ifndef ENABLE_SECURITY
250       if (is_profiling_enable) {
251         connector_size = ChildOpConnectorSize();
252         connector_capacity = ChildOpConnectorCapacity();
253       }
254 #endif
255       RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
256     }
257     if (curr_row.eoe() && send_epoch_end_) {
258       TensorRow dummy_row;
259       auto status = tdtInstancePtr->hostPush(dummy_row, true, channel_name_, is_profiling_enable, tdt_cost,
260                                              ACL_TENSOR_DATA_END_OF_SEQUENCE);
261       if (status != Status::OK()) {
262         if (stop_send_) {
263           send_finished_ = true;
264           MS_LOG(INFO) << "stop_send received";
265           return Status::OK();
266         }
267         return Status(StatusCode::kMDTDTPushFailure,
268                       "TDT Push data into device Failed, check the first error or TraceBack first, following are"
269                       " several possible checking way: 1) if training is not ready, still in network graph compiling"
270                       " stage, check error raised by Network used operator or environment configuration. 2) if"
271                       " interrupt in middle process of training, may check whether dataset sending num and network"
272                       " training num mismatch. 3) if this error raised in end of training, ignore this. 4) other cases,"
273                       " try find ascend host log or checking info log etc or search this in mindspore's FAQ.");
274       }
275       MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
276       stop_send_ = true;
277     }
278 #ifndef ENABLE_SECURITY
279     if (is_profiling_enable) {
280       connector_size = ChildOpConnectorSize();
281       connector_capacity = ChildOpConnectorCapacity();
282       tree_->SetEpochEnd();
283     }
284 #endif
285     RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
286   }
287 
288   // now we use this flag to judge whether exception raised.
289   if (stop_send_ || !TaskManager::FindMe()->Interrupted()) {
290     send_finished_ = true;
291   }
292   tree_->SetFinished();
293   MS_LOG(INFO) << "Device queue send " << send_batch << " batch.";
294 
295   return Status::OK();
296 }
297 
WaitContinueSignal() const298 void DeviceQueueOp::WaitContinueSignal() const {
299   while (stop_send_ && ascend_keep_waiting_) {
300     MS_LOG(DEBUG) << "stop_send flag is set, waiting for continue signal...";
301     std::this_thread::sleep_for(std::chrono::microseconds(100));
302   }
303 }
304 
LimitSendingBatches(int64_t send_batch,int64_t * sending_num,std::shared_ptr<ConfigManager> cfg)305 void DeviceQueueOp::LimitSendingBatches(int64_t send_batch, int64_t *sending_num, std::shared_ptr<ConfigManager> cfg) {
306   while (send_batch >= *sending_num) {
307     *sending_num = cfg->sending_batches();
308     if (*sending_num == 0) {
309       break;
310     }
311     std::this_thread::sleep_for(std::chrono::milliseconds(10));
312     MS_LOG(INFO) << "Wait for 10 milliseconds, as needed send batch is: " << *sending_num
313                  << ", and current sending batch is:" << send_batch;
314   }
315 }
316 
SendRowToTdt(TensorRow curr_row,bool is_profiling_enable,int32_t * tdt_cost)317 Status DeviceQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost) {
318   auto status = tdtInstancePtr->hostPush(curr_row, true, channel_name_, is_profiling_enable, *tdt_cost);
319   if (status != Status::OK()) {
320     if (stop_send_) {
321       MS_LOG(INFO) << "stop_send received";
322       return Status::OK();
323     }
324     return Status(StatusCode::kMDTDTPushFailure,
325                   "TDT Push data into device Failed, check the first error or TraceBack first, following are"
326                   " several possible checking way: 1) if training is not ready, still in network graph compiling"
327                   " stage, check error raised by Network used operator or environment configuration. 2) if"
328                   " interrupt in middle process of training, may check whether dataset sending num and network"
329                   " training num mismatch. 3) if this error raised in end of training, ignore this. 4) other cases,"
330                   " try find ascend host log or checking info log ects or search this in mindspore's FAQ.");
331   }
332   if (create_data_info_queue_) {
333     DATA_INFO data_info;
334     (void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
335                          [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
336     RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
337   }
338   return Status::OK();
339 }
340 #endif
341 
342 #ifdef ENABLE_TDTQUE
GetDataInfo(DATA_INFO * data_info)343 Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
344   if (!create_data_info_queue_) {
345     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "DataInfo queue is not created.");
346   }
347   // This place has a race condition with operator(), so the first one
348   // arrive here will do the initialize work.
349   {
350     std::unique_lock<std::mutex> lock(data_info_mutex_);
351     if (data_info_queue_ptr_ == nullptr) {
352       data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
353       RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
354     }
355   }
356   RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info));
357   return Status::OK();
358 }
359 #else
GetDataInfo(DATA_INFO * data_info)360 Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
361   return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "GetDataInfo is not supported yet.");
362 }
363 #endif
364 
365 #ifdef ENABLE_GPUQUE
SetThreadDevice()366 Status DeviceQueueOp::SetThreadDevice() {
367   // Without cudaSetDevice cuda memory will allocate on GPU:0 as default
368   // and will overload in distribute scenario.
369   auto ret = cudaSetDevice(rank_id_);
370   if (ret != cudaSuccess) {
371     std::string err;
372     err += "cudaSetDevice failed, ret[";
373     err += std::to_string(static_cast<int>(ret));
374     err += "], ";
375     err += cudaGetErrorString(ret);
376     return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, err);
377   }
378   return Status::OK();
379 }
380 
LaunchParallelCopyThread()381 Status DeviceQueueOp::LaunchParallelCopyThread() {
382   RETURN_UNEXPECTED_IF_NULL(tree_);
383   // Every thread use cuda api should SetThreadDevice
384   RETURN_IF_NOT_OK(SetThreadDevice());
385   // CircularPool may not safe under multi-threads scenario, so one worker with one pool
386   for (int i = 0; i < num_workers_; i++) {
387     std::shared_ptr<MemoryPool> pool;
388     RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool, -1, kDeviceQueGpuThreadMemory, false, true));
389     pool_.push_back(pool);
390   }
391   gpu_item_connector_ = std::make_unique<GpuItemConnector>(num_workers_, 1, queue_capacity_);
392   receive_queues_.Init(num_workers_, queue_capacity_);
393   RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks()));
394   RETURN_IF_NOT_OK(
395     tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1), "", id()));
396   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue",
397                                                       std::bind(&DeviceQueueOp::PushDataToGPU, this), nullptr, id()));
398 
399   return Status::OK();
400 }
401 
PushDataToGPU()402 Status DeviceQueueOp::PushDataToGPU() {
403   RETURN_UNEXPECTED_IF_NULL(tree_);
404   // Every thread use cuda api should SetThreadDevice
405   RETURN_IF_NOT_OK(SetThreadDevice());
406   TaskManager::FindMe()->Post();
407 #ifndef ENABLE_SECURITY
408   uint64_t batch_start_time = 0;
409   int32_t push_cost = 0;
410   int32_t connector_size = 0;
411   int32_t connector_capacity = 0;
412   std::shared_ptr<DeviceQueueTracing> profiling_node;
413   bool is_profiling_enable = tree_->GetProfilingManager()->IsProfilingEnable();
414   if (is_profiling_enable) {
415     std::shared_ptr<Tracing> node;
416     RETURN_IF_NOT_OK(tree_->GetProfilingManager()->GetTracingNode(kDeviceQueueTracingName, &node));
417     profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
418     batch_start_time = ProfilingTime::GetCurMilliSecond();
419     connector_capacity = gpu_item_connector_->capacity();
420   }
421 #endif
422 #ifdef ENABLE_DUMP_IR
423   md_channel_info_->RecordBatchQueue(gpu_item_connector_->size());
424   md_channel_info_->RecordPreprocessBatch(0);
425 #endif
426   std::vector<device::DataItemGpu> items;
427   RETURN_IF_NOT_OK(gpu_item_connector_->Pop(0, &items));
428   int64_t send_batch = 0;
429   bool is_open = false;
430   uint32_t handle = INVALID_HANDLE;
431   auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
432   while (!items.empty() && !GpuBufferMgr::GetInstance().IsClosed()) {
433 #ifdef ENABLE_DUMP_IR
434     md_channel_info_->RecordBatchQueue(gpu_item_connector_->size());
435     md_channel_info_->RecordPreprocessBatch(send_batch);
436     md_channel_info_->RecordPushStartTime();
437 #endif
438     if (!is_open) {
439       std::vector<size_t> data_size;
440       for (int32_t index = 0; index < items.size(); index++) {
441         data_size.push_back(items[index].data_len_);
442       }
443       handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function);
444       if (handle == INVALID_HANDLE) {
445         return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
446                       "[Internal ERROR] Failed to open channel for sending data.");
447       }
448       is_open = true;
449     }
450 
451     // Data prefetch only when PS mode enables cache.
452     if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name_, items[0].data_ptr_, items[0].data_len_,
453                                                         items[0].data_type_)) {
454       return Status(StatusCode::kMDTimeOut, __LINE__, __FILE__,
455                     "Failed to prefetch data in current PS mode(cache data when sending).");
456     }
457     RETURN_IF_NOT_OK(RetryPushData(handle, items));
458     send_batch++;
459 #ifndef ENABLE_SECURITY
460     if (is_profiling_enable) {
461       uint64_t end_time = ProfilingTime::GetCurMilliSecond();
462       // record push data time
463       profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost, end_time);
464       int32_t batch_cost = (int32_t)(end_time - batch_start_time);
465       // record batch time
466       profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost, end_time);
467       // record pipeline time
468       profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost, end_time);
469       batch_start_time = end_time;
470       // record connector depth
471       profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size, end_time);
472       connector_size = gpu_item_connector_->size();
473       connector_capacity = gpu_item_connector_->capacity();
474     }
475 #endif
476 #ifdef ENABLE_DUMP_IR
477     md_channel_info_->RecordBatchQueue(gpu_item_connector_->size());
478     md_channel_info_->RecordPreprocessBatch(send_batch);
479     md_channel_info_->RecordPushEndTime();
480 #endif
481     if (total_batch_ > 0 && send_batch >= total_batch_) {
482       break;
483     }
484     if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
485       auto rc = gpu_item_connector_->Pop(0, &items);
486       // If the batches send by dataset are more than gpu calculate, gpu will core for no signal notify.
487       if (rc.IsError()) {
488         GpuBufferMgr::GetInstance().Close(handle);
489         GpuBufferMgr::GetInstance().CloseConfirm();
490         return rc;
491       }
492     } else {
493       break;
494     }
495   }
496 
497   // now we use this flag to judge whether exception raised.
498   if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
499     send_finished_ = true;
500   }
501   tree_->SetFinished();
502   MS_LOG(INFO) << "Device queue send " << send_batch << " batch.";
503 
504   GpuBufferMgr::GetInstance().Close(handle);
505   GpuBufferMgr::GetInstance().CloseConfirm();
506   return Status::OK();
507 }
508 
RetryPushData(unsigned int handle,const std::vector<DataItemGpu> & items)509 Status DeviceQueueOp::RetryPushData(unsigned int handle, const std::vector<DataItemGpu> &items) {
510   bool flag_log = false;
511   while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) {
512     BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME);
513     if (ret) {
514       if (ret == BlockQueueStatus_T::ERROR_INPUT) {
515         return Status(
516           StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
517           "Invalid data, the types or shapes of current row is different with previous row(i.e. do batch operation but "
518           "drop_reminder is False, or without resize image into the same size, these will cause shapes differs).");
519       } else {
520         if (!stop_send_) {
521           if (!flag_log) {
522             MS_LOG(DEBUG) << "Retry pushing data...";
523             flag_log = true;
524           }
525           continue;
526         }
527         break;
528       }
529     } else {
530       break;
531     }
532   }
533   return Status::OK();
534 }
535 
536 // WorkEntry of DeviceQueueOp just do multi_threads memcpy for performance optimization.
WorkerEntry(int32_t worker_id)537 Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
538   // Every thread use cuda api should SetThreadDevice
539   RETURN_IF_NOT_OK(SetThreadDevice());
540   TaskManager::FindMe()->Post();
541   TensorRow current_row;
542   uint32_t batch_num = 0;
543   RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_row));
544   while (!current_row.quit() && !GpuBufferMgr::GetInstance().IsClosed()) {
545     std::vector<device::DataItemGpu> items;
546     for (int i = 0; i < current_row.size(); i++) {
547       device::DataItemGpu data_item;
548       data_item.data_len_ = static_cast<size_t>(current_row[i]->SizeInBytes());
549       data_item.data_ptr_ = nullptr;
550       data_item.worker_id_ = worker_id;
551       items.push_back(data_item);
552     }
553     RETURN_IF_NOT_OK(MallocForGPUData(&items, current_row, worker_id));
554     RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
555     batch_num++;
556 
557     RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_row));
558   }
559 
560   MS_LOG(INFO) << "Device queue worker id " << worker_id << "proc " << batch_num << "batch.";
561   // Add empty vector as quit flag.
562   std::vector<device::DataItemGpu> items;
563   RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
564   return Status::OK();
565 }
566 
SendDataToGPU()567 Status DeviceQueueOp::SendDataToGPU() {
568   RETURN_IF_NOT_OK(LaunchParallelCopyThread());
569   MS_LOG(INFO) << "Device queue, sending data to GPU.";
570 #ifndef ENABLE_SECURITY
571   uint64_t batch_record_start, batch_record_end;
572   batch_record_start = ProfilingTime::GetCurMilliSecond();
573 #endif
574   TensorRow current_row;
575   RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
576   first_fetch_flag_ = true;
577   int64_t num_buf = 0;
578   bool is_break_loop = false;
579   while (!current_row.eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
580     while (!current_row.eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
581       RETURN_IF_NOT_OK(FilterMetadata(&current_row));
582       RETURN_IF_NOT_OK(CheckExceptions(current_row));
583 #ifndef ENABLE_SECURITY
584       DetectPerBatchTime(&batch_record_start, &batch_record_end);
585 #endif
586       PrintBeginInfoWhenFirstBatch(first_push_flag_);
587       RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_row)));
588       PrintEndInfoWhenFirstBatch(&first_push_flag_);
589 #ifndef ENABLE_SECURITY
590       batch_record_start = ProfilingTime::GetCurMilliSecond();
591 #endif
592       if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
593         RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
594       } else {
595         is_break_loop = true;
596       }
597     }
598 
599     if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
600       RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
601     } else {
602       is_break_loop = true;
603     }
604   }
605 
606   for (uint32_t index = 0; index < num_workers_; index++) {
607     TensorRow quit_flag(TensorRow::kFlagQuit);
608     RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit_flag)));
609   }
610 
611   MS_LOG(INFO) << "Device queue receive " << num_buf - num_workers_ << " batch.";
612   return Status::OK();
613 }
614 
MallocForGPUData(std::vector<device::DataItemGpu> * items,const TensorRow & curr_row,const int32_t & worker_id)615 Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row,
616                                        const int32_t &worker_id) {
617   int i = 0;
618   for (auto &sub_item : *items) {
619     auto rc = pool_[worker_id]->Allocate(sub_item.data_len_, &sub_item.data_ptr_);
620     if (rc.IsError() || sub_item.data_ptr_ == nullptr) {
621       return Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__, "Memory malloc failed.");
622     }
623     if (curr_row[i] == nullptr) {
624       MS_LOG(ERROR) << "The pointer curr_row[" << i << "] is null";
625       return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "TensorRow 'curr_row' contains nullptr.");
626     }
627     sub_item.data_type_ = curr_row[i]->type().ToString();
628     const unsigned char *column_data = curr_row[i]->GetBuffer();
629     if (memcpy_s(sub_item.data_ptr_, sub_item.data_len_, column_data,
630                  static_cast<uint32_t>(curr_row[i++]->SizeInBytes())) != 0) {
631       MS_LOG(ERROR) << "memcpy_s failed!";
632       return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "memcpy failed when using memcpy_s do copy.");
633     }
634   }
635 
636   return Status::OK();
637 }
638 #endif
639 
SendDataToCPU()640 Status DeviceQueueOp::SendDataToCPU() {
641   MS_LOG(INFO) << "Device queue, sending data to CPU.";
642   int64_t total_batch = 0;
643 
644   while (!(child_iterator_->EofHandled())) {
645     TensorRow curr_row;
646     RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
647 
648     if (!first_fetch_flag_) {
649       first_fetch_flag_ = true;
650     }
651     if (!curr_row.empty()) {
652       for (auto &tensor : curr_row) {
653         MS_LOG(DEBUG) << "Feature size is " << tensor->SizeInBytes() << ".";
654       }
655       total_batch++;
656       if (stop_send_) break;
657     }
658   }
659 
660   MS_LOG(INFO) << "Device queue total batch is " << total_batch << ".";
661 
662   return Status::OK();
663 }
664 
Print(std::ostream & out,bool show_all) const665 void DeviceQueueOp::Print(std::ostream &out, bool show_all) const {
666   if (!show_all) {
667     // Call the super class for displaying any common 1-liner info
668     PipelineOp::Print(out, show_all);
669     // Then show any custom derived-internal 1-liner info for this op
670     out << "\n";
671   } else {
672     // Call the super class for displaying any common detailed info
673     PipelineOp::Print(out, show_all);
674     // Then show any custom derived-internal stuff
675     out << "\nChannel name: " << channel_name_ << "\nPrefetch size: " << prefetch_size_ << "\n\n";
676   }
677 }
678 
679 #ifndef ENABLE_SECURITY
ProfilingRecorder(bool is_profiling_enable,std::shared_ptr<DeviceQueueTracing> profiling_node,int64_t send_batch,int32_t tdt_cost,uint64_t * batch_start_time,uint64_t * end_time,int32_t connector_capacity,int32_t connector_size)680 void DeviceQueueOp::ProfilingRecorder(bool is_profiling_enable, std::shared_ptr<DeviceQueueTracing> profiling_node,
681                                       int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time,
682                                       uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) {
683   // Record the pipeline profiling info
684   if (is_profiling_enable) {
685     *end_time = ProfilingTime::GetCurMilliSecond();
686     // record push tdt time
687     profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch + 1, tdt_cost, *end_time);
688     int32_t batch_cost = (int32_t)(*end_time - *batch_start_time);
689     // record batch time
690     profiling_node->Record(TIME, BATCH_TIME, send_batch + 1, batch_cost, *end_time);
691     // record pipeline time
692     profiling_node->Record(TIME, PIPELINE_TIME, send_batch + 1, batch_cost - tdt_cost, *end_time);
693     *batch_start_time = *end_time;
694     // record connector depth
695     profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size, *end_time);
696   }
697 }
698 
DetectFirstBatch()699 Status DeviceQueueOp::DetectFirstBatch() {
700   TaskManager::FindMe()->Post();
701   uint8_t count_num = 0;
702   uint64_t temp_start_time = ProfilingTime::GetCurMilliSecond();
703   while (true) {
704     RETURN_IF_INTERRUPTED();
705     std::this_thread::sleep_for(std::chrono::milliseconds(2000));
706     uint64_t temp_end_time = ProfilingTime::GetCurMilliSecond();
707     // if fetch first batch, or detect 3 or more times and unable fetch first batch, exist with already printed Warning
708     if (first_fetch_flag_ == true || count_num > 2) {
709       break;
710     } else if (temp_end_time - temp_start_time > kTimeOutMilliSeconds) {
711       count_num++;
712       MS_LOG(WARNING) << "Bad performance attention, it waits more than 25 seconds and unable to fetch first Batch of "
713                          "data from dataset pipeline, which might result `GetNext` timeout problem. You may test "
714                          "dataset processing performance (with creating dataset iterator) and optimize it. Notes: "
715                          "shuffle operation is turn on for loading Dataset in default, which may effect first batch "
716                          "loading time.";
717     }
718   }
719   return Status::OK();
720 }
721 
DetectPerBatchTime(const uint64_t * start_time,uint64_t * end_time)722 void DeviceQueueOp::DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time) {
723   *end_time = ProfilingTime::GetCurMilliSecond();
724   if (*end_time - *start_time > kTimeOutMilliSeconds) {
725     MS_LOG(WARNING) << "Bad performance attention, it takes more than 25 seconds to fetch a batch of data from dataset "
726                        "pipeline, which might result `GetNext` timeout problem. You may test dataset processing"
727                        " performance(with creating dataset iterator) and optimize it.";
728   }
729 }
730 
PrintBeginInfoWhenFirstBatch(const bool & first_push_flag)731 void DeviceQueueOp::PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) {
732   if (first_push_flag != true) {
733     MS_LOG(INFO) << "Loading dataset and begin to push first batch into device ...";
734   }
735 }
736 
PrintEndInfoWhenFirstBatch(bool * first_push_flag)737 void DeviceQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) {
738   if (!first_push_flag) {
739     MS_LOG(WARNING) << "First batch flag: first_push_flag is nullptr";
740     return;
741   }
742   if (*first_push_flag != true) {
743     MS_LOG(INFO) << "Loading dataset and push first batch into device successful.";
744     *first_push_flag = true;
745   }
746 }
747 #endif
748 }  // namespace dataset
749 }  // namespace mindspore
750