1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/datasetops/device_queue_op.h"
18
19 #include <algorithm>
20 #include <iostream>
21 #include <memory>
22 #include <unordered_map>
23
24 #include "minddata/dataset/engine/dataset_iterator.h"
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/util/task_manager.h"
27
28 namespace mindspore {
29 namespace dataset {
DeviceQueueOp(std::string channel_name,DeviceType device_type,int32_t device_id,int32_t prefetch_size,bool send_epoch_end,int32_t total_batch,bool create_data_info_queue)30 DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
31 bool send_epoch_end, int32_t total_batch, bool create_data_info_queue)
32 : PipelineOp(1),
33 channel_name_(channel_name),
34 device_type_(device_type),
35 device_id_(device_id),
36 prefetch_size_(prefetch_size),
37 send_epoch_end_(send_epoch_end),
38 stop_send_(false),
39 send_finished_(false),
40 total_batch_(total_batch),
41 create_data_info_queue_(create_data_info_queue),
42 data_info_queue_ptr_(nullptr),
43 first_fetch_flag_(false),
44 first_push_flag_(false) {
45 #ifdef ENABLE_GPUQUE
46 // Get the total device num of current machine
47 int32_t device_count = 0;
48 cudaGetDeviceCount(&device_count);
49 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
50 rank_id_ = cfg->rank_id(); // Get the current rank_id
51 if (device_count > 0) {
52 rank_id_ = rank_id_ % device_count;
53 }
54 // Be careful when try to modified these num_workers_ and queue_capacity_,
55 // and we suggest num_workers_ * queue_capacity_ not greater than 16, because
56 // one worker one circular_pool with 1G pin memory, so num_workers_ * queue_capacity_
57 // must limit to avoid memory overload
58 num_workers_ = kDeviceQueGpuNumThreads;
59 queue_capacity_ = kDeviceQueGpuQueueCapacity;
60 #endif
61 #ifdef ENABLE_TDTQUE
62 ascend_keep_waiting_ = true;
63 tdtInstancePtr = std::make_shared<TdtPlugin>(channel_name_, device_id_);
64 #endif
65 #ifdef ENABLE_DUMP_IR
66 md_channel_info_ = std::make_shared<MDChannelInfo>(channel_name_);
67 #endif
68 }
69
~DeviceQueueOp()70 DeviceQueueOp::~DeviceQueueOp() {
71 #ifdef ENABLE_DUMP_IR
72 std::string rdr_msg = md_channel_info_->ToString();
73 if (!send_finished_ && !rdr_msg.empty()) {
74 MS_LOG(WARNING) << rdr_msg;
75 }
76 #endif
77 }
78
79 #ifdef ENABLE_GPUQUE
ReleaseData(void * addr,int32_t worker_id)80 void DeviceQueueOp::ReleaseData(void *addr, int32_t worker_id) {
81 if (addr != nullptr) {
82 pool_[worker_id]->Deallocate(addr);
83 }
84 }
85 #endif
86
EoeReceived(int32_t worker_id)87 Status DeviceQueueOp::EoeReceived(int32_t worker_id) {
88 state_ = OpState::kDeOpIdle;
89 return Status::OK();
90 }
91
FilterMetadata(TensorRow * row)92 Status DeviceQueueOp::FilterMetadata(TensorRow *row) {
93 std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
94 TensorRow output;
95 TensorRow tmp = *row;
96 std::vector<size_t> to_keep_indices;
97 for (auto column : current_name_id_map) {
98 std::string column_name = column.first;
99 // Need to filter meta column start with kDftMetaColumnPrefix
100 size_t pos = column_name.find(kDftMetaColumnPrefix);
101 if (pos != std::string::npos && pos == 0) {
102 continue;
103 }
104 to_keep_indices.push_back(column.second);
105 }
106 if (to_keep_indices.size() == 0) {
107 std::string err_msg = "No effective column found, maybe all columns are meta column and will be filtered. ";
108 err_msg += "If you want to output meta column please rename column name to a new one which is not start with ";
109 err_msg += "\"" + std::string(kDftMetaColumnPrefix) + "\"";
110 RETURN_STATUS_UNEXPECTED(err_msg);
111 }
112 std::sort(to_keep_indices.begin(), to_keep_indices.end());
113 (void)std::transform(to_keep_indices.begin(), to_keep_indices.end(), std::back_inserter(output),
114 [&tmp](const auto &it) { return std::move(tmp[it]); });
115 *row = std::move(output);
116 return Status::OK();
117 }
118
CheckExceptions(const TensorRow & row) const119 Status DeviceQueueOp::CheckExceptions(const TensorRow &row) const {
120 // this method checks if the row meets the conditions to be sent to TDT
121 for (const auto &item : row) {
122 CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Invalid data, cannot send string tensor to device.");
123 CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, cannot send tensor with no data to device.");
124 }
125 return Status::OK();
126 }
127
operator ()()128 Status DeviceQueueOp::operator()() {
129 #ifndef ENABLE_SECURITY
130 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
131 "Detect first batch", std::bind(&DeviceQueueOp::DetectFirstBatch, this), nullptr, id()));
132 #endif
133 TaskManager::FindMe()->Post();
134 child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
135
136 #ifdef ENABLE_DUMP_IR
137 if (md_channel_info_ == nullptr) {
138 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "[Internal ERROR] RDR module init failed.");
139 }
140 #endif
141 if (device_type_ == DeviceType::Ascend) {
142 #ifdef ENABLE_TDTQUE
143 if (create_data_info_queue_) {
144 // This place has a race condition with GetDataInfo, so the first one
145 // arrive here will do the initialize work.
146 {
147 std::unique_lock<std::mutex> lock(data_info_mutex_);
148 if (data_info_queue_ptr_ == nullptr) {
149 data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
150 RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
151 }
152 }
153 }
154 if (tdtInstancePtr->acl_handle_ == nullptr) {
155 RETURN_STATUS_UNEXPECTED("Create channel for sending data failed, please check DEVICE ID setting.");
156 }
157 RETURN_IF_NOT_OK(SendDataToAscend());
158 #endif
159 } else if (device_type_ == DeviceType::GPU) {
160 #ifdef ENABLE_GPUQUE
161 RETURN_IF_NOT_OK(SendDataToGPU());
162 #endif
163 } else if (device_type_ == DeviceType::CPU) {
164 RETURN_IF_NOT_OK(SendDataToCPU());
165 }
166
167 return Status::OK();
168 }
169
170 #ifdef ENABLE_TDTQUE
SendDataToAscend()171 Status DeviceQueueOp::SendDataToAscend() {
172 MS_LOG(INFO) << "Device queue, sending data to Ascend.";
173 #ifndef ENABLE_SECURITY
174 uint64_t batch_start_time = 0;
175 uint64_t end_time = 0;
176 uint64_t batch_record_start = 0;
177 uint64_t batch_record_end = 0;
178 #endif
179 int64_t send_batch = 0;
180 int32_t tdt_cost = 0;
181 #ifndef ENABLE_SECURITY
182 int32_t connector_size = 0;
183 int32_t connector_capacity = 0;
184 #endif
185 bool is_break_loop = false;
186
187 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
188 int64_t sending_num = cfg->sending_batches(); // Get the current sending_num
189
190 #ifndef ENABLE_SECURITY
191 std::shared_ptr<DeviceQueueTracing> profiling_node;
192 bool is_profiling_enable = tree_->GetProfilingManager()->IsProfilingEnable();
193 if (is_profiling_enable) {
194 std::shared_ptr<Tracing> node;
195 RETURN_IF_NOT_OK(tree_->GetProfilingManager()->GetTracingNode(kDeviceQueueTracingName, &node));
196 profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
197 batch_start_time = ProfilingTime::GetCurMilliSecond();
198 connector_capacity = ChildOpConnectorCapacity();
199 }
200 #else
201 bool is_profiling_enable = false;
202 #endif
203 #ifdef ENABLE_DUMP_IR
204 md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
205 md_channel_info_->RecordPreprocessBatch(0);
206 #endif
207 #ifndef ENABLE_SECURITY
208 batch_record_start = ProfilingTime::GetCurMilliSecond();
209 #endif
210 TensorRow curr_row;
211 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
212 first_fetch_flag_ = true;
213 while (!curr_row.eof() && !is_break_loop) {
214 while (!curr_row.eoe() && !is_break_loop) {
215 RETURN_IF_NOT_OK(FilterMetadata(&curr_row));
216 RETURN_IF_NOT_OK(CheckExceptions(curr_row));
217 WaitContinueSignal();
218 #ifdef ENABLE_DUMP_IR
219 md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
220 md_channel_info_->RecordPreprocessBatch(send_batch);
221 md_channel_info_->RecordPushStartTime();
222 #endif
223 #ifndef ENABLE_SECURITY
224 DetectPerBatchTime(&batch_record_start, &batch_record_end);
225 #endif
226 PrintBeginInfoWhenFirstBatch(first_push_flag_);
227 RETURN_IF_NOT_OK(SendRowToTdt(curr_row, is_profiling_enable, &tdt_cost));
228 PrintEndInfoWhenFirstBatch(&first_push_flag_);
229 #ifndef ENABLE_SECURITY
230 ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
231 connector_capacity, connector_size);
232 batch_record_start = ProfilingTime::GetCurMilliSecond();
233 #endif
234 send_batch++;
235 #ifdef ENABLE_DUMP_IR
236 md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
237 md_channel_info_->RecordPreprocessBatch(send_batch);
238 md_channel_info_->RecordPushEndTime();
239 #endif
240
241 if (total_batch_ > 0 && send_batch >= total_batch_) {
242 is_break_loop = true;
243 break;
244 }
245
246 // wait when sending num is not 0, and sending num no larger than already sending batch
247 LimitSendingBatches(send_batch, &sending_num, cfg);
248
249 #ifndef ENABLE_SECURITY
250 if (is_profiling_enable) {
251 connector_size = ChildOpConnectorSize();
252 connector_capacity = ChildOpConnectorCapacity();
253 }
254 #endif
255 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
256 }
257 if (curr_row.eoe() && send_epoch_end_) {
258 TensorRow dummy_row;
259 auto status = tdtInstancePtr->hostPush(dummy_row, true, channel_name_, is_profiling_enable, tdt_cost,
260 ACL_TENSOR_DATA_END_OF_SEQUENCE);
261 if (status != Status::OK()) {
262 if (stop_send_) {
263 send_finished_ = true;
264 MS_LOG(INFO) << "stop_send received";
265 return Status::OK();
266 }
267 return Status(StatusCode::kMDTDTPushFailure,
268 "TDT Push data into device Failed, check the first error or TraceBack first, following are"
269 " several possible checking way: 1) if training is not ready, still in network graph compiling"
270 " stage, check error raised by Network used operator or environment configuration. 2) if"
271 " interrupt in middle process of training, may check whether dataset sending num and network"
272 " training num mismatch. 3) if this error raised in end of training, ignore this. 4) other cases,"
273 " try find ascend host log or checking info log etc or search this in mindspore's FAQ.");
274 }
275 MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
276 stop_send_ = true;
277 }
278 #ifndef ENABLE_SECURITY
279 if (is_profiling_enable) {
280 connector_size = ChildOpConnectorSize();
281 connector_capacity = ChildOpConnectorCapacity();
282 tree_->SetEpochEnd();
283 }
284 #endif
285 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
286 }
287
288 // now we use this flag to judge whether exception raised.
289 if (stop_send_ || !TaskManager::FindMe()->Interrupted()) {
290 send_finished_ = true;
291 }
292 tree_->SetFinished();
293 MS_LOG(INFO) << "Device queue send " << send_batch << " batch.";
294
295 return Status::OK();
296 }
297
WaitContinueSignal() const298 void DeviceQueueOp::WaitContinueSignal() const {
299 while (stop_send_ && ascend_keep_waiting_) {
300 MS_LOG(DEBUG) << "stop_send flag is set, waiting for continue signal...";
301 std::this_thread::sleep_for(std::chrono::microseconds(100));
302 }
303 }
304
LimitSendingBatches(int64_t send_batch,int64_t * sending_num,std::shared_ptr<ConfigManager> cfg)305 void DeviceQueueOp::LimitSendingBatches(int64_t send_batch, int64_t *sending_num, std::shared_ptr<ConfigManager> cfg) {
306 while (send_batch >= *sending_num) {
307 *sending_num = cfg->sending_batches();
308 if (*sending_num == 0) {
309 break;
310 }
311 std::this_thread::sleep_for(std::chrono::milliseconds(10));
312 MS_LOG(INFO) << "Wait for 10 milliseconds, as needed send batch is: " << *sending_num
313 << ", and current sending batch is:" << send_batch;
314 }
315 }
316
SendRowToTdt(TensorRow curr_row,bool is_profiling_enable,int32_t * tdt_cost)317 Status DeviceQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost) {
318 auto status = tdtInstancePtr->hostPush(curr_row, true, channel_name_, is_profiling_enable, *tdt_cost);
319 if (status != Status::OK()) {
320 if (stop_send_) {
321 MS_LOG(INFO) << "stop_send received";
322 return Status::OK();
323 }
324 return Status(StatusCode::kMDTDTPushFailure,
325 "TDT Push data into device Failed, check the first error or TraceBack first, following are"
326 " several possible checking way: 1) if training is not ready, still in network graph compiling"
327 " stage, check error raised by Network used operator or environment configuration. 2) if"
328 " interrupt in middle process of training, may check whether dataset sending num and network"
329 " training num mismatch. 3) if this error raised in end of training, ignore this. 4) other cases,"
330 " try find ascend host log or checking info log ects or search this in mindspore's FAQ.");
331 }
332 if (create_data_info_queue_) {
333 DATA_INFO data_info;
334 (void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
335 [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
336 RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
337 }
338 return Status::OK();
339 }
340 #endif
341
342 #ifdef ENABLE_TDTQUE
GetDataInfo(DATA_INFO * data_info)343 Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
344 if (!create_data_info_queue_) {
345 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "DataInfo queue is not created.");
346 }
347 // This place has a race condition with operator(), so the first one
348 // arrive here will do the initialize work.
349 {
350 std::unique_lock<std::mutex> lock(data_info_mutex_);
351 if (data_info_queue_ptr_ == nullptr) {
352 data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
353 RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
354 }
355 }
356 RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info));
357 return Status::OK();
358 }
359 #else
GetDataInfo(DATA_INFO * data_info)360 Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
361 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "GetDataInfo is not supported yet.");
362 }
363 #endif
364
365 #ifdef ENABLE_GPUQUE
SetThreadDevice()366 Status DeviceQueueOp::SetThreadDevice() {
367 // Without cudaSetDevice cuda memory will allocate on GPU:0 as default
368 // and will overload in distribute scenario.
369 auto ret = cudaSetDevice(rank_id_);
370 if (ret != cudaSuccess) {
371 std::string err;
372 err += "cudaSetDevice failed, ret[";
373 err += std::to_string(static_cast<int>(ret));
374 err += "], ";
375 err += cudaGetErrorString(ret);
376 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, err);
377 }
378 return Status::OK();
379 }
380
LaunchParallelCopyThread()381 Status DeviceQueueOp::LaunchParallelCopyThread() {
382 RETURN_UNEXPECTED_IF_NULL(tree_);
383 // Every thread use cuda api should SetThreadDevice
384 RETURN_IF_NOT_OK(SetThreadDevice());
385 // CircularPool may not safe under multi-threads scenario, so one worker with one pool
386 for (int i = 0; i < num_workers_; i++) {
387 std::shared_ptr<MemoryPool> pool;
388 RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool, -1, kDeviceQueGpuThreadMemory, false, true));
389 pool_.push_back(pool);
390 }
391 gpu_item_connector_ = std::make_unique<GpuItemConnector>(num_workers_, 1, queue_capacity_);
392 receive_queues_.Init(num_workers_, queue_capacity_);
393 RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks()));
394 RETURN_IF_NOT_OK(
395 tree_->LaunchWorkers(num_workers_, std::bind(&DeviceQueueOp::WorkerEntry, this, std::placeholders::_1), "", id()));
396 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue",
397 std::bind(&DeviceQueueOp::PushDataToGPU, this), nullptr, id()));
398
399 return Status::OK();
400 }
401
PushDataToGPU()402 Status DeviceQueueOp::PushDataToGPU() {
403 RETURN_UNEXPECTED_IF_NULL(tree_);
404 // Every thread use cuda api should SetThreadDevice
405 RETURN_IF_NOT_OK(SetThreadDevice());
406 TaskManager::FindMe()->Post();
407 #ifndef ENABLE_SECURITY
408 uint64_t batch_start_time = 0;
409 int32_t push_cost = 0;
410 int32_t connector_size = 0;
411 int32_t connector_capacity = 0;
412 std::shared_ptr<DeviceQueueTracing> profiling_node;
413 bool is_profiling_enable = tree_->GetProfilingManager()->IsProfilingEnable();
414 if (is_profiling_enable) {
415 std::shared_ptr<Tracing> node;
416 RETURN_IF_NOT_OK(tree_->GetProfilingManager()->GetTracingNode(kDeviceQueueTracingName, &node));
417 profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
418 batch_start_time = ProfilingTime::GetCurMilliSecond();
419 connector_capacity = gpu_item_connector_->capacity();
420 }
421 #endif
422 #ifdef ENABLE_DUMP_IR
423 md_channel_info_->RecordBatchQueue(gpu_item_connector_->size());
424 md_channel_info_->RecordPreprocessBatch(0);
425 #endif
426 std::vector<device::DataItemGpu> items;
427 RETURN_IF_NOT_OK(gpu_item_connector_->Pop(0, &items));
428 int64_t send_batch = 0;
429 bool is_open = false;
430 uint32_t handle = INVALID_HANDLE;
431 auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
432 while (!items.empty() && !GpuBufferMgr::GetInstance().IsClosed()) {
433 #ifdef ENABLE_DUMP_IR
434 md_channel_info_->RecordBatchQueue(gpu_item_connector_->size());
435 md_channel_info_->RecordPreprocessBatch(send_batch);
436 md_channel_info_->RecordPushStartTime();
437 #endif
438 if (!is_open) {
439 std::vector<size_t> data_size;
440 for (int32_t index = 0; index < items.size(); index++) {
441 data_size.push_back(items[index].data_len_);
442 }
443 handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function);
444 if (handle == INVALID_HANDLE) {
445 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
446 "[Internal ERROR] Failed to open channel for sending data.");
447 }
448 is_open = true;
449 }
450
451 // Data prefetch only when PS mode enables cache.
452 if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name_, items[0].data_ptr_, items[0].data_len_,
453 items[0].data_type_)) {
454 return Status(StatusCode::kMDTimeOut, __LINE__, __FILE__,
455 "Failed to prefetch data in current PS mode(cache data when sending).");
456 }
457 RETURN_IF_NOT_OK(RetryPushData(handle, items));
458 send_batch++;
459 #ifndef ENABLE_SECURITY
460 if (is_profiling_enable) {
461 uint64_t end_time = ProfilingTime::GetCurMilliSecond();
462 // record push data time
463 profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch, push_cost, end_time);
464 int32_t batch_cost = (int32_t)(end_time - batch_start_time);
465 // record batch time
466 profiling_node->Record(TIME, BATCH_TIME, send_batch, batch_cost, end_time);
467 // record pipeline time
468 profiling_node->Record(TIME, PIPELINE_TIME, send_batch, batch_cost - push_cost, end_time);
469 batch_start_time = end_time;
470 // record connector depth
471 profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch, connector_size, end_time);
472 connector_size = gpu_item_connector_->size();
473 connector_capacity = gpu_item_connector_->capacity();
474 }
475 #endif
476 #ifdef ENABLE_DUMP_IR
477 md_channel_info_->RecordBatchQueue(gpu_item_connector_->size());
478 md_channel_info_->RecordPreprocessBatch(send_batch);
479 md_channel_info_->RecordPushEndTime();
480 #endif
481 if (total_batch_ > 0 && send_batch >= total_batch_) {
482 break;
483 }
484 if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
485 auto rc = gpu_item_connector_->Pop(0, &items);
486 // If the batches send by dataset are more than gpu calculate, gpu will core for no signal notify.
487 if (rc.IsError()) {
488 GpuBufferMgr::GetInstance().Close(handle);
489 GpuBufferMgr::GetInstance().CloseConfirm();
490 return rc;
491 }
492 } else {
493 break;
494 }
495 }
496
497 // now we use this flag to judge whether exception raised.
498 if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
499 send_finished_ = true;
500 }
501 tree_->SetFinished();
502 MS_LOG(INFO) << "Device queue send " << send_batch << " batch.";
503
504 GpuBufferMgr::GetInstance().Close(handle);
505 GpuBufferMgr::GetInstance().CloseConfirm();
506 return Status::OK();
507 }
508
RetryPushData(unsigned int handle,const std::vector<DataItemGpu> & items)509 Status DeviceQueueOp::RetryPushData(unsigned int handle, const std::vector<DataItemGpu> &items) {
510 bool flag_log = false;
511 while (!GpuBufferMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) {
512 BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME);
513 if (ret) {
514 if (ret == BlockQueueStatus_T::ERROR_INPUT) {
515 return Status(
516 StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
517 "Invalid data, the types or shapes of current row is different with previous row(i.e. do batch operation but "
518 "drop_reminder is False, or without resize image into the same size, these will cause shapes differs).");
519 } else {
520 if (!stop_send_) {
521 if (!flag_log) {
522 MS_LOG(DEBUG) << "Retry pushing data...";
523 flag_log = true;
524 }
525 continue;
526 }
527 break;
528 }
529 } else {
530 break;
531 }
532 }
533 return Status::OK();
534 }
535
536 // WorkEntry of DeviceQueueOp just do multi_threads memcpy for performance optimization.
WorkerEntry(int32_t worker_id)537 Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
538 // Every thread use cuda api should SetThreadDevice
539 RETURN_IF_NOT_OK(SetThreadDevice());
540 TaskManager::FindMe()->Post();
541 TensorRow current_row;
542 uint32_t batch_num = 0;
543 RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_row));
544 while (!current_row.quit() && !GpuBufferMgr::GetInstance().IsClosed()) {
545 std::vector<device::DataItemGpu> items;
546 for (int i = 0; i < current_row.size(); i++) {
547 device::DataItemGpu data_item;
548 data_item.data_len_ = static_cast<size_t>(current_row[i]->SizeInBytes());
549 data_item.data_ptr_ = nullptr;
550 data_item.worker_id_ = worker_id;
551 items.push_back(data_item);
552 }
553 RETURN_IF_NOT_OK(MallocForGPUData(&items, current_row, worker_id));
554 RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
555 batch_num++;
556
557 RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_row));
558 }
559
560 MS_LOG(INFO) << "Device queue worker id " << worker_id << "proc " << batch_num << "batch.";
561 // Add empty vector as quit flag.
562 std::vector<device::DataItemGpu> items;
563 RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
564 return Status::OK();
565 }
566
SendDataToGPU()567 Status DeviceQueueOp::SendDataToGPU() {
568 RETURN_IF_NOT_OK(LaunchParallelCopyThread());
569 MS_LOG(INFO) << "Device queue, sending data to GPU.";
570 #ifndef ENABLE_SECURITY
571 uint64_t batch_record_start, batch_record_end;
572 batch_record_start = ProfilingTime::GetCurMilliSecond();
573 #endif
574 TensorRow current_row;
575 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
576 first_fetch_flag_ = true;
577 int64_t num_buf = 0;
578 bool is_break_loop = false;
579 while (!current_row.eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
580 while (!current_row.eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
581 RETURN_IF_NOT_OK(FilterMetadata(¤t_row));
582 RETURN_IF_NOT_OK(CheckExceptions(current_row));
583 #ifndef ENABLE_SECURITY
584 DetectPerBatchTime(&batch_record_start, &batch_record_end);
585 #endif
586 PrintBeginInfoWhenFirstBatch(first_push_flag_);
587 RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_row)));
588 PrintEndInfoWhenFirstBatch(&first_push_flag_);
589 #ifndef ENABLE_SECURITY
590 batch_record_start = ProfilingTime::GetCurMilliSecond();
591 #endif
592 if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
593 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
594 } else {
595 is_break_loop = true;
596 }
597 }
598
599 if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
600 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
601 } else {
602 is_break_loop = true;
603 }
604 }
605
606 for (uint32_t index = 0; index < num_workers_; index++) {
607 TensorRow quit_flag(TensorRow::kFlagQuit);
608 RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit_flag)));
609 }
610
611 MS_LOG(INFO) << "Device queue receive " << num_buf - num_workers_ << " batch.";
612 return Status::OK();
613 }
614
MallocForGPUData(std::vector<device::DataItemGpu> * items,const TensorRow & curr_row,const int32_t & worker_id)615 Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row,
616 const int32_t &worker_id) {
617 int i = 0;
618 for (auto &sub_item : *items) {
619 auto rc = pool_[worker_id]->Allocate(sub_item.data_len_, &sub_item.data_ptr_);
620 if (rc.IsError() || sub_item.data_ptr_ == nullptr) {
621 return Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__, "Memory malloc failed.");
622 }
623 if (curr_row[i] == nullptr) {
624 MS_LOG(ERROR) << "The pointer curr_row[" << i << "] is null";
625 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "TensorRow 'curr_row' contains nullptr.");
626 }
627 sub_item.data_type_ = curr_row[i]->type().ToString();
628 const unsigned char *column_data = curr_row[i]->GetBuffer();
629 if (memcpy_s(sub_item.data_ptr_, sub_item.data_len_, column_data,
630 static_cast<uint32_t>(curr_row[i++]->SizeInBytes())) != 0) {
631 MS_LOG(ERROR) << "memcpy_s failed!";
632 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "memcpy failed when using memcpy_s do copy.");
633 }
634 }
635
636 return Status::OK();
637 }
638 #endif
639
SendDataToCPU()640 Status DeviceQueueOp::SendDataToCPU() {
641 MS_LOG(INFO) << "Device queue, sending data to CPU.";
642 int64_t total_batch = 0;
643
644 while (!(child_iterator_->EofHandled())) {
645 TensorRow curr_row;
646 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
647
648 if (!first_fetch_flag_) {
649 first_fetch_flag_ = true;
650 }
651 if (!curr_row.empty()) {
652 for (auto &tensor : curr_row) {
653 MS_LOG(DEBUG) << "Feature size is " << tensor->SizeInBytes() << ".";
654 }
655 total_batch++;
656 if (stop_send_) break;
657 }
658 }
659
660 MS_LOG(INFO) << "Device queue total batch is " << total_batch << ".";
661
662 return Status::OK();
663 }
664
Print(std::ostream & out,bool show_all) const665 void DeviceQueueOp::Print(std::ostream &out, bool show_all) const {
666 if (!show_all) {
667 // Call the super class for displaying any common 1-liner info
668 PipelineOp::Print(out, show_all);
669 // Then show any custom derived-internal 1-liner info for this op
670 out << "\n";
671 } else {
672 // Call the super class for displaying any common detailed info
673 PipelineOp::Print(out, show_all);
674 // Then show any custom derived-internal stuff
675 out << "\nChannel name: " << channel_name_ << "\nPrefetch size: " << prefetch_size_ << "\n\n";
676 }
677 }
678
679 #ifndef ENABLE_SECURITY
ProfilingRecorder(bool is_profiling_enable,std::shared_ptr<DeviceQueueTracing> profiling_node,int64_t send_batch,int32_t tdt_cost,uint64_t * batch_start_time,uint64_t * end_time,int32_t connector_capacity,int32_t connector_size)680 void DeviceQueueOp::ProfilingRecorder(bool is_profiling_enable, std::shared_ptr<DeviceQueueTracing> profiling_node,
681 int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time,
682 uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) {
683 // Record the pipeline profiling info
684 if (is_profiling_enable) {
685 *end_time = ProfilingTime::GetCurMilliSecond();
686 // record push tdt time
687 profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch + 1, tdt_cost, *end_time);
688 int32_t batch_cost = (int32_t)(*end_time - *batch_start_time);
689 // record batch time
690 profiling_node->Record(TIME, BATCH_TIME, send_batch + 1, batch_cost, *end_time);
691 // record pipeline time
692 profiling_node->Record(TIME, PIPELINE_TIME, send_batch + 1, batch_cost - tdt_cost, *end_time);
693 *batch_start_time = *end_time;
694 // record connector depth
695 profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size, *end_time);
696 }
697 }
698
DetectFirstBatch()699 Status DeviceQueueOp::DetectFirstBatch() {
700 TaskManager::FindMe()->Post();
701 uint8_t count_num = 0;
702 uint64_t temp_start_time = ProfilingTime::GetCurMilliSecond();
703 while (true) {
704 RETURN_IF_INTERRUPTED();
705 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
706 uint64_t temp_end_time = ProfilingTime::GetCurMilliSecond();
707 // if fetch first batch, or detect 3 or more times and unable fetch first batch, exist with already printed Warning
708 if (first_fetch_flag_ == true || count_num > 2) {
709 break;
710 } else if (temp_end_time - temp_start_time > kTimeOutMilliSeconds) {
711 count_num++;
712 MS_LOG(WARNING) << "Bad performance attention, it waits more than 25 seconds and unable to fetch first Batch of "
713 "data from dataset pipeline, which might result `GetNext` timeout problem. You may test "
714 "dataset processing performance (with creating dataset iterator) and optimize it. Notes: "
715 "shuffle operation is turn on for loading Dataset in default, which may effect first batch "
716 "loading time.";
717 }
718 }
719 return Status::OK();
720 }
721
DetectPerBatchTime(const uint64_t * start_time,uint64_t * end_time)722 void DeviceQueueOp::DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time) {
723 *end_time = ProfilingTime::GetCurMilliSecond();
724 if (*end_time - *start_time > kTimeOutMilliSeconds) {
725 MS_LOG(WARNING) << "Bad performance attention, it takes more than 25 seconds to fetch a batch of data from dataset "
726 "pipeline, which might result `GetNext` timeout problem. You may test dataset processing"
727 " performance(with creating dataset iterator) and optimize it.";
728 }
729 }
730
PrintBeginInfoWhenFirstBatch(const bool & first_push_flag)731 void DeviceQueueOp::PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) {
732 if (first_push_flag != true) {
733 MS_LOG(INFO) << "Loading dataset and begin to push first batch into device ...";
734 }
735 }
736
PrintEndInfoWhenFirstBatch(bool * first_push_flag)737 void DeviceQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) {
738 if (!first_push_flag) {
739 MS_LOG(WARNING) << "First batch flag: first_push_flag is nullptr";
740 return;
741 }
742 if (*first_push_flag != true) {
743 MS_LOG(INFO) << "Loading dataset and push first batch into device successful.";
744 *first_push_flag = true;
745 }
746 }
747 #endif
748 } // namespace dataset
749 } // namespace mindspore
750