1 /**
2 * Copyright 2019-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/datasetops/data_queue_op.h"
18
19 #include <algorithm>
20 #include <iostream>
21 #include <memory>
22 #include <unordered_map>
23
24 #include "minddata/dataset/engine/gpu_item_connector.h"
25 #include "minddata/dataset/engine/dataset_iterator.h"
26 #include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h"
27 #include "minddata/dataset/util/status.h"
28 #include "minddata/dataset/util/task_manager.h"
29 #ifdef WITH_BACKEND
30 #include "mindspore/ccsrc/include/backend/data_queue/data_queue_mgr.h"
31 #include "include/backend/distributed/embedding_cache/embedding_cache_utils.h"
32 #include "include/backend/distributed/embedding_cache/data_queue_manager.h"
33 #include "utils/ms_context.h"
34 #endif
35 namespace mindspore {
36 namespace dataset {
37 #ifdef WITH_BACKEND
38 using distributed::DataQueueManager;
39 using distributed::IdDataInfo;
40 using distributed::IndexDataInfo;
41 #endif
42
43 namespace {
ConvertTensorRowToDataQueueItem(const TensorRow & row)44 std::vector<DataQueueItem> ConvertTensorRowToDataQueueItem(const TensorRow &row) {
45 std::vector<device::DataQueueItem> items;
46 for (auto &i : row) {
47 device::DataQueueItem data_item;
48 data_item.data_len = static_cast<size_t>(i->SizeInBytes());
49 data_item.shapes = i->shape().AsVector();
50 data_item.data_ptr = const_cast<void *>(static_cast<const void *>(i->GetBuffer()));
51 data_item.data_type = i->type().ToString();
52 items.emplace_back(std::move(data_item));
53 }
54 return items;
55 }
56
57 #ifdef WITH_BACKEND
DeepCopyConvertTensorRowToDataQueueItem(const TensorRow & row)58 std::vector<DataQueueItem> DeepCopyConvertTensorRowToDataQueueItem(const TensorRow &row) {
59 std::vector<device::DataQueueItem> items;
60 for (auto &i : row) {
61 device::DataQueueItem data_item;
62 data_item.data_len = static_cast<size_t>(i->SizeInBytes());
63 data_item.shapes = i->shape().AsVector();
64 data_item.data_ptr = malloc(data_item.data_len);
65 MS_EXCEPTION_IF_NULL(data_item.data_ptr);
66 auto ret = memcpy_s(data_item.data_ptr, data_item.data_len, i->GetBuffer(), data_item.data_len);
67 if (ret != EOK) {
68 MS_LOG(EXCEPTION) << "Memcpy for data queue item failed, errno[" << ret << "]";
69 }
70 data_item.data_type = i->type().ToString();
71
72 items.emplace_back(std::move(data_item));
73 }
74 return items;
75 }
76 #endif
77
PushEpochEndToQueue(const std::string & channel_name)78 void PushEpochEndToQueue(const std::string &channel_name) {
79 #ifdef WITH_BACKEND
80 const auto &id_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name).first;
81 MS_EXCEPTION_IF_NULL(id_data_queue);
82 IdDataInfo *data = new IdDataInfo();
83 MS_EXCEPTION_IF_NULL(data);
84 data->end_of_epoch_ = true;
85 id_data_queue->Push(data);
86 #endif
87 }
88
PushFileEndToQueue(const std::string & channel_name)89 void PushFileEndToQueue(const std::string &channel_name) {
90 #ifdef WITH_BACKEND
91 const auto &id_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name).first;
92 MS_EXCEPTION_IF_NULL(id_data_queue);
93 IdDataInfo *data = new IdDataInfo();
94 MS_EXCEPTION_IF_NULL(data);
95 data->end_of_file_ = true;
96 id_data_queue->Push(data);
97 #endif
98 }
99 } // namespace
DataQueueOp(const std::string channel_name,DeviceType device_type,int32_t device_id,bool send_epoch_end,int32_t total_batch,bool create_data_info_queue)100 DataQueueOp::DataQueueOp(const std::string channel_name, DeviceType device_type, int32_t device_id, bool send_epoch_end,
101 int32_t total_batch, bool create_data_info_queue)
102 : PipelineOp(1),
103 ascend_keep_waiting_(true),
104 num_workers_(kDeviceQueGpuNumThreads),
105 queue_capacity_(kDeviceQueGpuQueueCapacity),
106 channel_name_(channel_name),
107 device_type_(device_type),
108 device_id_(device_id),
109 send_epoch_end_(send_epoch_end),
110 stop_send_(false),
111 send_finished_(false),
112 total_batch_(total_batch),
113 create_data_info_queue_(create_data_info_queue),
114 data_info_queue_ptr_(nullptr),
115 first_fetch_flag_(false),
116 first_push_flag_(false),
117 send_summary_({}) {
118 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
119 dynamic_shape_ = cfg->dynamic_shape();
120
121 // Be careful when try to modified these num_workers_ and queue_capacity_,
122 // and we suggest num_workers_ * queue_capacity_ not greater than 16, because
123 // one worker one circular_pool with 1G pin memory, so num_workers_ * queue_capacity_
124 // must limit to avoid memory overload
125 #ifdef WITH_BACKEND
126 MS_EXCEPTION_IF_NULL(MsContext::GetInstance());
127 if (MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kAscendDevice) {
128 ascend_data_queue_ =
129 device::DataQueueMgr::GetInstance().CreateDataQueue(kAscendDevice, channel_name, dynamic_shape_, 0, {});
130 }
131 enable_prefetch_cache_pipeline_ = distributed::EmbeddingCacheTableManager::GetInstance().enable_pipeline();
132 #endif
133 #ifdef ENABLE_DUMP_IR
134 md_channel_info_ = std::make_shared<MDChannelInfo>(channel_name_);
135 #endif
136 }
137
~DataQueueOp()138 DataQueueOp::~DataQueueOp() {
139 #ifdef ENABLE_DUMP_IR
140 // BFS iter execution tree to get send epoch from EpochControl Op
141 std::vector<std::shared_ptr<DatasetOp>> child_node = this->Children();
142 size_t node_index = 0;
143 int32_t num_epochs = 0;
144 while (child_node.size() != 0 && node_index < child_node.size()) {
145 auto node = child_node[node_index];
146 if (node->Name() == kEpochCtrlOp) {
147 EpochCtrlOp *op = dynamic_cast<EpochCtrlOp *>(node.get());
148 if (op != nullptr) {
149 num_epochs = op->NumEpochs();
150 break;
151 }
152 }
153 auto child_child_node = node->Children();
154 if (!child_child_node.empty()) {
155 (void)std::copy(child_child_node.begin(), child_child_node.end(), std::back_inserter(child_node));
156 }
157 ++node_index;
158 }
159
160 // won't print rdr if call stop_send manually or send infinite epoch
161 std::string rdr_msg = md_channel_info_->ToFormatString();
162 if (!send_finished_ && !rdr_msg.empty() && num_epochs != -1) {
163 MS_LOG(WARNING) << rdr_msg;
164 }
165 MS_LOG(INFO) << "Release the DataQueueOp, channel name: " << channel_name_;
166 #endif
167 }
168
ReleaseData(void * addr,int32_t worker_id)169 void DataQueueOp::ReleaseData(void *addr, int32_t worker_id) {
170 if (addr != nullptr && worker_id >= 0 && worker_id < pool_.size()) {
171 pool_[worker_id]->Deallocate(addr);
172 }
173 }
174
EoeReceived(int32_t)175 Status DataQueueOp::EoeReceived(int32_t) {
176 state_ = OpState::kDeOpIdle;
177 return Status::OK();
178 }
179
FilterMetadata(TensorRow * row) const180 Status DataQueueOp::FilterMetadata(TensorRow *row) const {
181 std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
182 TensorRow output;
183 TensorRow tmp = *row;
184 std::vector<size_t> to_keep_indices;
185 for (auto column : current_name_id_map) {
186 std::string column_name = column.first;
187 // Need to filter meta column start with kDftMetaColumnPrefix
188 size_t pos = column_name.find(kDftMetaColumnPrefix);
189 if (pos != std::string::npos && pos == 0) {
190 continue;
191 }
192 to_keep_indices.push_back(column.second);
193 }
194 if (to_keep_indices.size() == 0) {
195 std::string err_msg = "No effective column found, maybe all columns are meta column and will be filtered. ";
196 err_msg += "If you want to output meta column please rename column name to a new one which is not start with ";
197 err_msg += "\"" + std::string(kDftMetaColumnPrefix) + "\"";
198 RETURN_STATUS_UNEXPECTED(err_msg);
199 }
200 std::sort(to_keep_indices.begin(), to_keep_indices.end());
201 (void)std::transform(to_keep_indices.begin(), to_keep_indices.end(), std::back_inserter(output),
202 [&tmp](const auto &it) { return std::move(tmp[it]); });
203 *row = std::move(output);
204 return Status::OK();
205 }
206
CheckExceptions(const TensorRow & row) const207 Status DataQueueOp::CheckExceptions(const TensorRow &row) const {
208 // this method checks if the row meets the conditions to be sent to TDT
209 for (const auto &item : row) {
210 CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(),
211 "Invalid datatype, cannot send string, or Python dict to device.");
212 CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, the data send to device is null.");
213 }
214 return Status::OK();
215 }
216
operator ()()217 Status DataQueueOp::operator()() {
218 #ifndef ENABLE_SECURITY
219 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Detect first batch",
220 std::bind(&DataQueueOp::DetectFirstBatch, this), nullptr, id()));
221 #endif
222 TaskManager::FindMe()->Post();
223 child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
224
225 #ifdef ENABLE_DUMP_IR
226 if (md_channel_info_ == nullptr) {
227 RETURN_STATUS_UNEXPECTED("[Internal ERROR] RDR module init failed.");
228 }
229 #endif
230 if (device_type_ == DeviceType::Ascend) {
231 #ifdef WITH_BACKEND
232 if (create_data_info_queue_) {
233 // This place has a race condition with GetDataInfo, so the first one
234 // arrive here will do the initialize work.
235 {
236 std::unique_lock<std::mutex> lock(data_info_mutex_);
237 if (data_info_queue_ptr_ == nullptr) {
238 data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
239 RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
240 }
241 }
242 }
243 RETURN_IF_NOT_OK(SendDataToAscend());
244
245 #endif
246 } else if (device_type_ == DeviceType::GPU) {
247 #ifdef WITH_BACKEND
248 if (create_data_info_queue_) {
249 // This place has a race condition with GetDataInfo, so the first one
250 // arrive here will do the initialize work.
251 {
252 std::unique_lock<std::mutex> lock(data_info_mutex_);
253 if (data_info_queue_ptr_ == nullptr) {
254 data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
255 RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
256 }
257 }
258 }
259 RETURN_IF_NOT_OK(SendDataToGPU());
260 #endif
261 } else if (device_type_ == DeviceType::CPU) {
262 RETURN_IF_NOT_OK(SendDataToCPU());
263 }
264
265 return Status::OK();
266 }
267
PushPrefetchDataToGPU()268 Status DataQueueOp::PushPrefetchDataToGPU() {
269 #ifdef WITH_BACKEND
270 TaskManager::FindMe()->Post();
271 const auto &index_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name_).second;
272 MS_EXCEPTION_IF_NULL(index_data_queue);
273 bool eof_flag = false;
274 int64_t send_batch = 0;
275
276 bool is_profiling_enable = GlobalContext::profiling_manager()->IsProfilingEnable(tree_);
277 uint64_t push_cost;
278
279 while (!eof_flag && !DataQueueManager::GetInstance().IsClosed() && !device::DataQueueMgr::GetInstance().IsClosed() &&
280 !TaskManager::FindMe()->Interrupted()) {
281 IndexDataInfo *data = index_data_queue->Pop();
282 if (DataQueueManager::GetInstance().IsClosed() || device::DataQueueMgr::GetInstance().IsClosed()) {
283 // Terminate abnormally.
284 break;
285 }
286 MS_EXCEPTION_IF_NULL(data);
287
288 eof_flag = data->end_of_file_;
289 if (data->data_ != nullptr) {
290 MS_EXCEPTION_IF_NULL(data->items_);
291
292 auto status = RetryPushData(*(data->items_), is_profiling_enable, &push_cost);
293 delete data->items_;
294 delete data;
295
296 RETURN_IF_NOT_OK(status);
297
298 ++send_batch;
299 if (total_batch_ > 0 && send_batch >= total_batch_) {
300 break;
301 }
302 } else {
303 delete data;
304 }
305 }
306 #endif
307 return Status::OK();
308 }
309
PushDataToGPUCacheQueue(std::vector<device::DataQueueItem> && data_items)310 Status DataQueueOp::PushDataToGPUCacheQueue(std::vector<device::DataQueueItem> &&data_items) {
311 #ifdef WITH_BACKEND
312 std::vector<device::DataQueueItem> *items = new std::vector<device::DataQueueItem>(std::move(data_items));
313 MS_EXCEPTION_IF_NULL(items);
314 if (items->empty()) {
315 MS_LOG(EXCEPTION) << "The data queue item is empty.";
316 }
317 const auto &id_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name_).first;
318 MS_EXCEPTION_IF_NULL(id_data_queue);
319 IdDataInfo *data = new IdDataInfo(items->at(0).data_ptr, items->at(0).data_len, items, false, false);
320
321 MS_EXCEPTION_IF_NULL(data);
322 id_data_queue->Push(data);
323 #endif
324
325 return Status::OK();
326 }
327
PushPrefetchDataToAscend()328 Status DataQueueOp::PushPrefetchDataToAscend() {
329 #ifdef WITH_BACKEND
330 TaskManager::FindMe()->Post();
331 const auto &index_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name_).second;
332 MS_EXCEPTION_IF_NULL(index_data_queue);
333 MS_EXCEPTION_IF_NULL(ascend_data_queue_);
334 bool eof_flag = false;
335 int64_t send_batch = 0;
336 while (!eof_flag && !DataQueueManager::GetInstance().IsClosed() && ascend_data_queue_->IsOpen()) {
337 IndexDataInfo *data = index_data_queue->Pop();
338 if (DataQueueManager::GetInstance().IsClosed() || !ascend_data_queue_->IsOpen()) {
339 // Terminate abnormally.
340 break;
341 }
342 MS_EXCEPTION_IF_NULL(data);
343 device::DataQueueStatus status;
344
345 bool eoe_flag = data->end_of_epoch_;
346 eof_flag = data->end_of_file_;
347 if (data->data_ != nullptr) {
348 MS_EXCEPTION_IF_NULL(data->items_);
349
350 if (ascend_data_queue_->QueueType() == "Ascend_MBUF") {
351 size_t batch_data_len =
352 std::accumulate(data->items_->begin(), data->items_->end(), static_cast<size_t>(0),
353 [](size_t acc, const DataQueueItem &item) { return acc + item.data_len; });
354 RETURN_IF_NOT_OK(WaitForAscendQueue(batch_data_len));
355 }
356
357 status = ascend_data_queue_->Push(*(data->items_));
358 for (auto &item : *(data->items_)) {
359 MS_EXCEPTION_IF_NULL(item.data_ptr);
360 free(item.data_ptr);
361 }
362 delete data->items_;
363 delete data;
364 if (status != device::DataQueueStatus::SUCCESS) {
365 if (stop_send_) {
366 MS_LOG(INFO) << "stop_send received";
367 return Status::OK();
368 }
369 RETURN_STATUS_ERROR(
370 StatusCode::kMDTDTPushFailure,
371 "TDT Push data into device Failed, check the first error or TraceBack first, more checking advises are: "
372 "1) if training is not ready, error might raised by network computing operator or environment configuration. "
373 "2) other cases, checking info level log or search this error in mindspore's FAQ for detail solution.");
374 }
375 ++send_batch;
376 if (total_batch_ > 0 && send_batch >= total_batch_) {
377 break;
378 }
379 } else {
380 delete data;
381 if (send_epoch_end_ && eoe_flag) {
382 // empty data
383 status = ascend_data_queue_->Push({});
384 bool break_loop = false;
385 RETURN_IF_NOT_OK(CheckPushStatus(status, stop_send_, &send_finished_, &break_loop));
386 if (break_loop) {
387 break;
388 }
389 }
390 }
391 }
392 #endif
393
394 return Status::OK();
395 }
396
PushDataToAscendCacheQueue(const TensorRow & curr_row)397 Status DataQueueOp::PushDataToAscendCacheQueue(const TensorRow &curr_row) {
398 #ifdef WITH_BACKEND
399 std::vector<device::DataQueueItem> *items =
400 new std::vector<device::DataQueueItem>(DeepCopyConvertTensorRowToDataQueueItem(curr_row));
401 MS_EXCEPTION_IF_NULL(items);
402 if (items->empty()) {
403 MS_LOG(EXCEPTION) << "The data queue item is empty.";
404 }
405 const auto &id_data_queue = DataQueueManager::GetInstance().GetDataQueue(channel_name_).first;
406 MS_EXCEPTION_IF_NULL(id_data_queue);
407 IdDataInfo *data =
408 new IdDataInfo(items->at(0).data_ptr, items->at(0).data_len, items, curr_row.eoe(), curr_row.eof());
409
410 MS_EXCEPTION_IF_NULL(data);
411 id_data_queue->Push(data);
412
413 if (create_data_info_queue_) {
414 DATA_INFO data_info;
415 (void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
416 [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
417 RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
418 }
419 #endif
420 return Status::OK();
421 }
422
WaitForAscendQueue(size_t batch_data_len)423 Status DataQueueOp::WaitForAscendQueue(size_t batch_data_len) {
424 // Queue control logic for mbuf in host, to prevent from hang/exit abnormally
425 // case 1: If mbuf queue memory + next row memory < 2G then continue send, else suspend;
426 // case 2: Based on case 1, if element nums in mbuf < max_queue_size then continue send, else suspend;
427 // case 3: If row memory >= 1G, can only send 1 row each time, queue_size will always in [0, 1];
428 // note:
429 // why need queue control: acltdtSendTensor will hang when queue is full, need to break this thread by ourselves
430 // how about dynamic shape: yes, memory_per_batch_ collect memory of rows in different shapes.
431 // how about row too large(>2G): we can promise the first row will be sent and hang in this while, but we dont
432 // know if the device will out of memory. If not oom, send next row, otherwise device returns error.
433
434 // Calculate the memory of next row before sending
435 size_t queue_size = ascend_data_queue_->QueryQueueSize();
436 double row_memory = batch_data_len / 1024. / 1024. / 1024.;
437 memory_per_batch_.push_back(row_memory);
438
439 const double max_queue_memory = 2.;
440 const size_t max_queue_size = 100;
441 const int64_t send_interval = 1000;
442 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "WaitForAscend"));
443 while ((row_memory + CalMbufQueueMemory(queue_size) >= max_queue_memory || queue_size >= max_queue_size) &&
444 queue_size != 0) {
445 RETURN_IF_INTERRUPTED();
446 MS_LOG(DEBUG) << "Mbuf queue size: " << queue_size << ", max queue limit: " << max_queue_size << ". "
447 << "Next row memory: " << row_memory << ", Mbuf memory: " << CalMbufQueueMemory(queue_size);
448
449 queue_size = ascend_data_queue_->QueryQueueSize();
450 std::this_thread::sleep_for(std::chrono::microseconds(send_interval));
451 }
452 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "WaitForAscend"));
453 return Status::OK();
454 }
455
SendDataToAscend()456 Status DataQueueOp::SendDataToAscend() {
457 MS_LOG(INFO) << "Device queue, sending data to Ascend.";
458 if (enable_prefetch_cache_pipeline_) {
459 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
460 "Push prefetch data to ascend queue", std::bind(&DataQueueOp::PushPrefetchDataToAscend, this), nullptr, id()));
461 }
462 #ifndef ENABLE_SECURITY
463 uint64_t batch_start_time = 0;
464 uint64_t end_time = 0;
465 uint64_t batch_record_start = 0;
466 uint64_t batch_record_end = 0;
467 #endif
468 int64_t send_batch = 0;
469 int32_t tdt_cost = 0;
470 #ifndef ENABLE_SECURITY
471 int32_t connector_size = 0;
472 int32_t connector_capacity = 0;
473 #endif
474 bool is_break_loop = false;
475
476 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
477 int64_t sending_num = cfg->sending_batches(); // Get the current sending_num
478
479 #ifndef ENABLE_SECURITY
480 std::shared_ptr<DeviceQueueTracing> profiling_node;
481 bool is_profiling_enable = GlobalContext::profiling_manager()->IsProfilingEnable(tree_);
482 if (is_profiling_enable) {
483 std::shared_ptr<Tracing> node;
484 RETURN_IF_NOT_OK(GlobalContext::profiling_manager()->GetTracingNode(kDeviceQueueTracingName, &node));
485 profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
486 batch_start_time = ProfilingTime::GetCurMilliSecond();
487 connector_capacity = ChildOpConnectorCapacity();
488 }
489 #else
490 bool is_profiling_enable = false;
491 #endif
492 #ifdef ENABLE_DUMP_IR
493 RETURN_IF_NOT_OK(md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()));
494 RETURN_IF_NOT_OK(md_channel_info_->RecordPreprocessBatch(0));
495 #endif
496 #ifndef ENABLE_SECURITY
497 batch_record_start = ProfilingTime::GetCurMilliSecond();
498 #endif
499 TensorRow curr_row;
500 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
501 first_fetch_flag_ = true;
502
503 MS_LOG(INFO) << "Begin to send data to device, channel name: " << channel_name_;
504
505 while (!curr_row.eof() && !is_break_loop) {
506 SendInfo send_info_per_epoch;
507 send_info_per_epoch.epoch = op_current_epochs_ + 1;
508 send_summary_.push_back(send_info_per_epoch);
509 while (!curr_row.eoe() && !is_break_loop) {
510 RETURN_IF_NOT_OK(FilterMetadata(&curr_row));
511 RETURN_IF_NOT_OK(CheckExceptions(curr_row));
512 WaitContinueSignal();
513 #ifdef ENABLE_DUMP_IR
514 RETURN_IF_NOT_OK(md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()));
515 RETURN_IF_NOT_OK(md_channel_info_->RecordPreprocessBatch(send_batch));
516 RETURN_IF_NOT_OK(md_channel_info_->RecordPushStartTime());
517 if (send_batch == 0) {
518 // record the push start time for first batch
519 RETURN_IF_NOT_OK(md_channel_info_->RecordPushFirstStartTime());
520 }
521 #endif
522 #ifndef ENABLE_SECURITY
523 DetectPerBatchTime(&batch_record_start, &batch_record_end);
524 #endif
525 PrintBeginInfoWhenFirstBatch(first_push_flag_);
526 // when training stopped, handle might have been destroyed immediately
527 if (ascend_data_queue_ != nullptr && !ascend_data_queue_->IsOpen()) {
528 MS_LOG(WARNING) << "Thread has already been terminated.";
529 is_break_loop = true;
530 continue;
531 }
532 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "PushToAscend"));
533 if (!enable_prefetch_cache_pipeline_) {
534 #ifdef ENABLE_DUMP_IR
535 if (ascend_data_queue_->QueueType() == "Ascend_MBUF") {
536 RETURN_IF_NOT_OK(md_channel_info_->RecordDeviceQueue(ascend_data_queue_->QueryQueueSize()));
537 }
538 MS_LOG(INFO) << md_channel_info_->ToFormatString();
539 #endif
540 RETURN_IF_NOT_OK(SendRowToTdt(curr_row, is_profiling_enable, &tdt_cost));
541 } else {
542 RETURN_IF_NOT_OK(PushDataToAscendCacheQueue(curr_row));
543 }
544 #ifdef ENABLE_DUMP_IR
545 if (send_batch == 0) {
546 // record the push end time for first batch
547 RETURN_IF_NOT_OK(md_channel_info_->RecordPushFirstEndTime());
548 }
549 #endif
550 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "PushToAscend", {{"TensorRowFlags", curr_row.FlagName()}}));
551 PrintEndInfoWhenFirstBatch(&first_push_flag_);
552 #ifndef ENABLE_SECURITY
553 ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
554 connector_capacity, connector_size);
555 batch_record_start = ProfilingTime::GetCurMilliSecond();
556 #endif
557 send_batch++;
558 MS_LOG(INFO) << "Have sent " << send_batch << " batch(es) to device, channel name: " << channel_name_;
559 #ifdef ENABLE_DUMP_IR
560 RETURN_IF_NOT_OK(md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()));
561 RETURN_IF_NOT_OK(md_channel_info_->RecordPreprocessBatch(send_batch));
562 RETURN_IF_NOT_OK(md_channel_info_->RecordPushEndTime());
563 #endif
564
565 if (total_batch_ > 0 && send_batch >= total_batch_) {
566 is_break_loop = true;
567 break;
568 }
569
570 // wait when sending num is not 0, and sending num no larger than already sending batch
571 LimitSendingBatches(send_batch, &sending_num, cfg);
572
573 #ifndef ENABLE_SECURITY
574 RecordProfilingData(is_profiling_enable, false, &connector_size, &connector_capacity, &send_batch);
575 #endif
576 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
577 #ifndef ENABLE_SECURITY
578 uint64_t batch_fetch_end = ProfilingTime::GetCurMilliSecond();
579 #endif
580 if (ascend_data_queue_->QueueType() == "Ascend_MBUF") {
581 if (!enable_prefetch_cache_pipeline_) {
582 RETURN_IF_NOT_OK(WaitForAscendQueue(static_cast<size_t>(curr_row.SizeInBytes())));
583 }
584 }
585 #ifndef ENABLE_SECURITY
586 uint64_t queue_wait_end = ProfilingTime::GetCurMilliSecond();
587 // Skip the time looping in the mbuf queue control, FetchNextTensorRow time is what we need
588 batch_record_start = batch_record_start + (queue_wait_end - batch_fetch_end);
589 #endif
590 }
591
592 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "PushToAscend"));
593 // send epoch end flag: ACL_TENSOR_DATA_END_OF_SEQUENCE to tdt
594 RETURN_IF_NOT_OK(SendEpochEndToAscend(curr_row, is_profiling_enable, &tdt_cost, &is_break_loop));
595 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "PushToAscend",
596 {{"TensorRowFlags", TensorRow(TensorRow::kFlagEOE).FlagName()}}));
597 UpdateRepeatAndEpochCounter();
598 #ifndef ENABLE_SECURITY
599 RecordProfilingData(is_profiling_enable, true, &connector_size, &connector_capacity, &send_batch);
600 #endif
601 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
602 }
603
604 if (enable_prefetch_cache_pipeline_) {
605 PushFileEndToQueue(channel_name_);
606 }
607
608 // now we use this flag to judge whether exception raised.
609 if (stop_send_ || !TaskManager::FindMe()->Interrupted()) {
610 send_finished_ = true;
611 }
612 tree_->SetFinished();
613 MS_LOG(INFO) << "ExecutionTree finished. Device queue sent number of batches: " << send_batch;
614
615 return Status::OK();
616 }
617
RecordProfilingData(bool is_profiling_enable,bool end_of_epoch,int32_t * connector_size,int32_t * connector_capacity,const int64_t * send_batch) const618 void DataQueueOp::RecordProfilingData(bool is_profiling_enable, bool end_of_epoch, int32_t *connector_size,
619 int32_t *connector_capacity, const int64_t *send_batch) const {
620 if (is_profiling_enable) {
621 *connector_size = ChildOpConnectorSize();
622 *connector_capacity = ChildOpConnectorCapacity();
623 }
624 if (end_of_epoch) {
625 tree_->SetEpochEnd();
626 GlobalContext::profiling_manager()->RecordEndOfEpoch(*send_batch);
627 }
628 }
629
CalMbufQueueMemory(size_t realtime_queue_size)630 double DataQueueOp::CalMbufQueueMemory(size_t realtime_queue_size) {
631 while (memory_per_batch_.size() > realtime_queue_size) {
632 memory_per_batch_.pop_front();
633 }
634 return std::accumulate(memory_per_batch_.begin(), memory_per_batch_.end(), 0.);
635 }
636
SendEpochEndToAscend(const TensorRow & curr_row,const bool & is_profiling_enable,int32_t * tdt_cost,bool * is_break_loop)637 Status DataQueueOp::SendEpochEndToAscend(const TensorRow &curr_row, const bool &is_profiling_enable, int32_t *tdt_cost,
638 bool *is_break_loop) {
639 RETURN_UNEXPECTED_IF_NULL(tdt_cost);
640 RETURN_UNEXPECTED_IF_NULL(is_break_loop);
641 if (curr_row.eoe() && send_epoch_end_ && ascend_data_queue_->IsOpen()) {
642 TensorRow dummy_row;
643 if (!enable_prefetch_cache_pipeline_) {
644 #ifndef ENABLE_SECURITY
645 double start_time = 0;
646 if (is_profiling_enable) {
647 start_time = ProfilingTime::GetCurMilliSecond();
648 }
649 #endif
650 auto status = ascend_data_queue_->Push({});
651 #ifndef ENABLE_SECURITY
652 if (is_profiling_enable) {
653 double end_time = ProfilingTime::GetCurMilliSecond();
654 RETURN_UNEXPECTED_IF_NULL(tdt_cost);
655 *tdt_cost = static_cast<int32_t>(end_time - start_time);
656 }
657 #endif
658
659 RETURN_IF_NOT_OK(CheckPushStatus(status, stop_send_, &send_finished_, is_break_loop));
660 MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
661 } else {
662 PushEpochEndToQueue(channel_name_);
663 }
664 stop_send_ = true;
665 }
666 return Status::OK();
667 }
668
WaitContinueSignal() const669 void DataQueueOp::WaitContinueSignal() const {
670 while (stop_send_ && ascend_keep_waiting_) {
671 MS_LOG(DEBUG) << "stop_send flag is set, waiting for continue signal...";
672 std::this_thread::sleep_for(std::chrono::microseconds(100));
673 }
674 }
675
LimitSendingBatches(int64_t send_batch,int64_t * sending_num,const std::shared_ptr<ConfigManager> & cfg) const676 void DataQueueOp::LimitSendingBatches(int64_t send_batch, int64_t *sending_num,
677 const std::shared_ptr<ConfigManager> &cfg) const {
678 while (send_batch >= *sending_num) {
679 *sending_num = cfg->sending_batches();
680 if (*sending_num == 0) {
681 break;
682 }
683 std::this_thread::sleep_for(std::chrono::milliseconds(10));
684 MS_LOG(INFO) << "Wait for 10 milliseconds, as needed send batch is: " << *sending_num
685 << ", and current sending batch is:" << send_batch;
686 }
687 }
688
SendRowToTdt(TensorRow curr_row,bool is_profiling_enable,int32_t * tdt_cost)689 Status DataQueueOp::SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost) {
690 std::vector<device::DataQueueItem> items = ConvertTensorRowToDataQueueItem(curr_row);
691 #ifndef ENABLE_SECURITY
692 double start_time = 0;
693 if (is_profiling_enable) {
694 start_time = ProfilingTime::GetCurMilliSecond();
695 }
696 #endif
697 auto status = ascend_data_queue_->Push(items);
698 #ifndef ENABLE_SECURITY
699 if (is_profiling_enable) {
700 double end_time = ProfilingTime::GetCurMilliSecond();
701 RETURN_UNEXPECTED_IF_NULL(tdt_cost);
702 *tdt_cost = static_cast<int32_t>(end_time - start_time);
703 }
704 #endif
705 if (status != device::DataQueueStatus::SUCCESS) {
706 if (stop_send_) {
707 MS_LOG(INFO) << "stop_send received";
708 return Status::OK();
709 }
710 RETURN_STATUS_ERROR(
711 StatusCode::kMDTDTPushFailure,
712 "TDT Push data into device Failed, check the first error or TraceBack first, more checking advises are: "
713 "1) if training is not ready, error might raised by network computing operator or environment configuration. "
714 "2) other cases, checking info level log or search this error in mindspore's FAQ for detail solution.");
715 }
716 if (create_data_info_queue_) {
717 DATA_INFO data_info;
718 (void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
719 [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
720 RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(std::move(data_info)));
721 }
722 return Status::OK();
723 }
724
CheckPushStatus(DataQueueStatus status,bool stop_send,bool * send_finished,bool * is_break_loop)725 Status DataQueueOp::CheckPushStatus(DataQueueStatus status, bool stop_send, bool *send_finished, bool *is_break_loop) {
726 if (status != DataQueueStatus::SUCCESS) {
727 if (stop_send) {
728 *send_finished = true;
729 MS_LOG(INFO) << "stop_send received";
730 return Status::OK();
731 }
732 // when training stopped, handle might have been destroyed immediately
733 if (!ascend_data_queue_->IsOpen()) {
734 *is_break_loop = true;
735 MS_LOG(WARNING) << "Thread has already been terminated.";
736 return Status::OK();
737 }
738 RETURN_STATUS_ERROR(
739 StatusCode::kMDTDTPushFailure,
740 "TDT Push data into device Failed, check the first error or TraceBack first, more checking advises are: "
741 "1) if training is not ready, error might raised by network computing operator or environment configuration. "
742 "2) other cases, checking info level log or search this error in mindspore's FAQ for detail solution.");
743 }
744 return Status::OK();
745 }
746
GetDataInfo(DATA_INFO * data_info)747 Status DataQueueOp::GetDataInfo(DATA_INFO *data_info) {
748 #ifdef WITH_BACKEND
749 RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
750 if (!create_data_info_queue_) {
751 RETURN_STATUS_UNEXPECTED("[Internal ERROR] DataInfo queue is not created.");
752 }
753 // This place has a race condition with operator(), so the first one
754 // arrive here will do the initialize work.
755 {
756 std::unique_lock<std::mutex> lock(data_info_mutex_);
757 if (data_info_queue_ptr_ == nullptr) {
758 data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
759 RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
760 }
761 }
762 RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info));
763 #endif
764 return Status::OK();
765 }
766
GetMbufQueueSize(size_t * queue_size)767 Status DataQueueOp::GetMbufQueueSize(size_t *queue_size) {
768 #ifdef WITH_BACKEND
769 if (TaskManager::FindMe()->Interrupted()) {
770 RETURN_STATUS_ERROR(StatusCode::kMDTDTPushFailure,
771 "DataQueueOp thread had been interrupted, please check for other error messages.");
772 }
773 if (device_type_ == DeviceType::Ascend && ascend_data_queue_->QueueType() == "Ascend_MBUF") {
774 *queue_size = ascend_data_queue_->QueryQueueSize();
775 } else {
776 *queue_size = 1;
777 }
778 #endif
779 return Status::OK();
780 }
781
GetSendInfo()782 std::vector<std::vector<double>> DataQueueOp::GetSendInfo() {
783 std::vector<std::vector<double>> send_info_per_epoch;
784 (void)std::transform(send_summary_.begin(), send_summary_.end(), std::back_inserter(send_info_per_epoch),
785 [](const SendInfo &send_info) -> std::vector<double> {
786 return std::vector<double>{send_info.epoch, send_info.fetch_data_num,
787 send_info.first_data_time, send_info.fetch_data_time};
788 });
789 // history message control
790 constexpr auto kMaxInfoSize = 5;
791 while (send_info_per_epoch.size() > kMaxInfoSize) {
792 send_info_per_epoch.erase(send_info_per_epoch.begin());
793 }
794 return send_info_per_epoch;
795 }
796
SetThreadDevice()797 Status DataQueueOp::SetThreadDevice() {
798 #ifdef WITH_BACKEND
799 (void)device::DataQueueMgr::GetInstance().SetThreadDevice(channel_name_);
800 #endif
801 return Status::OK();
802 }
803
CreateDynamicDataQueue()804 Status DataQueueOp::CreateDynamicDataQueue() {
805 #ifdef WITH_BACKEND
806 if (dynamic_shape_) {
807 auto ret = device::DataQueueMgr::GetInstance().CreateDynamicBufQueue(channel_name_, kDynamicHostQueueCapacity);
808 if (ret != DataQueueStatus::SUCCESS && ret != DataQueueStatus::QUEUE_EXIST) {
809 RETURN_STATUS_ERROR(StatusCode::kMEFailed, "Create dynamic data queue failed");
810 }
811 }
812 #endif
813 return Status::OK();
814 }
815
LaunchParallelCopyThread()816 Status DataQueueOp::LaunchParallelCopyThread() {
817 #ifdef WITH_BACKEND
818 RETURN_UNEXPECTED_IF_NULL(tree_);
819 // Every thread use cuda api should SetThreadDevice
820 RETURN_IF_NOT_OK(SetThreadDevice());
821 // CircularPool may not safe under multi-threads scenario, so one worker with one pool
822 for (int i = 0; i < num_workers_; i++) {
823 std::shared_ptr<MemoryPool> pool;
824 RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
825 RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(
826 &pool, -1, kDeviceQueGpuThreadMemory, false,
827 MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET) == kGPUDevice));
828 pool_.push_back(pool);
829 }
830 gpu_connector_ = std::make_unique<GpuConnector>(num_workers_, 1, queue_capacity_);
831 receive_queues_.Init(num_workers_, queue_capacity_);
832 RETURN_IF_NOT_OK(receive_queues_.Register(tree_->AllTasks()));
833 RETURN_IF_NOT_OK(tree_->LaunchWorkers(static_cast<int>(num_workers_),
834 std::bind(&DataQueueOp::WorkerEntry, this, std::placeholders::_1),
835 Name() + "::WorkerEntry", id()));
836 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Push data to GPU queue",
837 std::bind(&DataQueueOp::PushDataToGPU, this), nullptr, id()));
838 if (enable_prefetch_cache_pipeline_) {
839 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
840 "Push prefetch data to GPU queue", std::bind(&DataQueueOp::PushPrefetchDataToGPU, this), nullptr, id()));
841 }
842
843 #endif
844 return Status::OK();
845 }
846
NoExceptionRaised() const847 bool DataQueueOp::NoExceptionRaised() const {
848 #ifdef WITH_BACKEND
849 return !TaskManager::FindMe()->Interrupted() && !device::DataQueueMgr::GetInstance().IsClosed();
850 #else
851 return !TaskManager::FindMe()->Interrupted();
852 #endif
853 }
854
PushDataToGPU()855 Status DataQueueOp::PushDataToGPU() {
856 #ifdef WITH_BACKEND
857 RETURN_UNEXPECTED_IF_NULL(tree_);
858 // Every thread use cuda api should SetThreadDevice
859 RETURN_IF_NOT_OK(SetThreadDevice());
860 TaskManager::FindMe()->Post();
861 #ifndef ENABLE_SECURITY
862 uint64_t batch_start_time = 0;
863 uint64_t end_time = 0;
864 uint64_t push_cost = 0;
865 std::shared_ptr<DeviceQueueTracing> profiling_node;
866 bool is_profiling_enable = GlobalContext::profiling_manager()->IsProfilingEnable(tree_);
867 if (is_profiling_enable) {
868 std::shared_ptr<Tracing> node;
869 RETURN_IF_NOT_OK(GlobalContext::profiling_manager()->GetTracingNode(kDeviceQueueTracingName, &node));
870 profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
871 batch_start_time = ProfilingTime::GetCurMilliSecond();
872 }
873 #endif
874 #ifdef ENABLE_DUMP_IR
875 md_channel_info_->RecordBatchQueue(gpu_connector_->size());
876 md_channel_info_->RecordPreprocessBatch(0);
877 #endif
878 GpuConnectorItem item;
879 RETURN_IF_NOT_OK(gpu_connector_->Pop(0, &item));
880 auto items = std::move(item.data_item);
881 bool eoe_flag = item.eoe_flag;
882 int64_t send_batch = 0;
883 auto release_function = std::bind(&DataQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
884 auto ret = device::DataQueueMgr::GetInstance().Open(channel_name_, release_function);
885 if (ret != DataQueueStatus::SUCCESS) {
886 RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to open channel for sending data.");
887 }
888
889 while (!(items.empty() && !eoe_flag) && !device::DataQueueMgr::GetInstance().IsClosed()) {
890 if (!eoe_flag) {
891 #ifdef ENABLE_DUMP_IR
892 md_channel_info_->RecordBatchQueue(gpu_connector_->size());
893 md_channel_info_->RecordPreprocessBatch(send_batch);
894 md_channel_info_->RecordPushStartTime();
895 #endif
896 // Data prefetch only when PS mode enables cache.
897 RETURN_IF_NOT_OK(CollectOpInfoStart(this->NameWithID(), "PushToGPU"));
898 if (!enable_prefetch_cache_pipeline_) {
899 RETURN_IF_NOT_OK(RetryPushData(items, is_profiling_enable, &push_cost));
900 } else {
901 PushDataToGPUCacheQueue(std::move(items));
902 }
903 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->NameWithID(), "PushToGPU", {{"TensorRowFlags", "Data"}}));
904 #ifndef ENABLE_SECURITY
905 ProfilingRecorder(is_profiling_enable, profiling_node, send_batch, push_cost, &batch_start_time, &end_time,
906 gpu_connector_->capacity(), gpu_connector_->size());
907 #endif
908 send_batch++;
909 MS_LOG(INFO) << "Have sent " << send_batch << " batch(es) to device, channel name: " << channel_name_;
910 #ifdef ENABLE_DUMP_IR
911 md_channel_info_->RecordBatchQueue(gpu_connector_->size());
912 md_channel_info_->RecordPreprocessBatch(send_batch);
913 md_channel_info_->RecordPushEndTime();
914 #endif
915 if (total_batch_ > 0 && send_batch >= total_batch_) {
916 break;
917 }
918 } else {
919 #ifndef ENABLE_SECURITY
920 if (is_profiling_enable) {
921 tree_->SetEpochEnd();
922 GlobalContext::profiling_manager()->RecordEndOfEpoch(send_batch);
923 }
924 #endif
925 }
926 if (NoExceptionRaised()) {
927 auto rc = gpu_connector_->Pop(0, &item);
928 items = std::move(item.data_item);
929 eoe_flag = item.eoe_flag;
930 // If the batches send by dataset are more than gpu calculate, gpu will core for no signal notify.
931 if (rc.IsError()) {
932 device::DataQueueMgr::GetInstance().Close(channel_name_);
933 device::DataQueueMgr::GetInstance().CloseConfirm();
934 return rc;
935 }
936 } else {
937 break;
938 }
939 }
940
941 if (enable_prefetch_cache_pipeline_) {
942 // Send eof to cache queue
943 PushFileEndToQueue(channel_name_);
944 }
945
946 // now we use this flag to judge whether exception raised.
947 if (NoExceptionRaised()) {
948 send_finished_ = true;
949 }
950 tree_->SetFinished();
951 MS_LOG(INFO) << "ExecutionTree finished. Device queue sent number of batches: " << send_batch;
952
953 device::DataQueueMgr::GetInstance().Close(channel_name_);
954 device::DataQueueMgr::GetInstance().CloseConfirm();
955 return Status::OK();
956 }
957
958 // WorkEntry of DataQueueOp just do multi_threads memcpy for performance optimization.
WorkerEntry(int32_t worker_id)959 Status DataQueueOp::WorkerEntry(int32_t worker_id) {
960 // Every thread use cuda api should SetThreadDevice
961 RETURN_IF_NOT_OK(SetThreadDevice());
962 TaskManager::FindMe()->Post();
963 TensorRow current_row;
964 uint32_t batch_num = 0;
965 RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_row));
966 while (!current_row.quit() && !device::DataQueueMgr::GetInstance().IsClosed()) {
967 GpuConnectorItem connector_item = {{}, current_row.eoe()};
968 if (!connector_item.eoe_flag) {
969 std::vector<device::DataQueueItem> items;
970 for (auto &i : current_row) {
971 device::DataQueueItem data_item;
972 data_item.data_len = static_cast<size_t>(i->SizeInBytes());
973 data_item.shapes = i->shape().AsVector();
974 data_item.data_ptr = nullptr;
975 data_item.worker_id = worker_id;
976 items.push_back(data_item);
977 }
978
979 RETURN_IF_NOT_OK(MallocForGPUData(&items, current_row, worker_id));
980 connector_item.data_item = std::move(items);
981 batch_num++;
982 } else {
983 MS_LOG(INFO) << "EOE Detected";
984 }
985 RETURN_IF_NOT_OK(gpu_connector_->Add(worker_id, std::move(connector_item)));
986 RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(¤t_row));
987 }
988
989 MS_LOG(INFO) << "Device queue worker id " << worker_id << " processed number of batches: " << batch_num;
990 // Add empty data_item vector with eoe_flag=false as quit flag.
991 GpuConnectorItem connector_item = {{}, false};
992 RETURN_IF_NOT_OK(gpu_connector_->Add(worker_id, std::move(connector_item)));
993 #endif
994 return Status::OK();
995 }
996
SendDataToGPU()997 Status DataQueueOp::SendDataToGPU() {
998 #ifdef WITH_BACKEND
999 RETURN_IF_NOT_OK(LaunchParallelCopyThread());
1000 MS_LOG(INFO) << "Device queue, sending data to GPU.";
1001 #ifndef ENABLE_SECURITY
1002 uint64_t batch_record_start;
1003 uint64_t batch_record_end;
1004 batch_record_start = ProfilingTime::GetCurMilliSecond();
1005 #endif
1006 TensorRow current_row;
1007 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
1008 first_fetch_flag_ = true;
1009 int64_t num_buf = 0;
1010 bool is_break_loop = false;
1011
1012 MS_LOG(INFO) << "Begin to send data to device, channel name: " << channel_name_;
1013
1014 while (!current_row.eof() && !is_break_loop && !device::DataQueueMgr::GetInstance().IsClosed()) {
1015 SendInfo send_info_per_epoch;
1016 send_info_per_epoch.epoch = op_current_epochs_ + 1;
1017 send_summary_.push_back(send_info_per_epoch);
1018 while (!current_row.eoe() && !is_break_loop && !device::DataQueueMgr::GetInstance().IsClosed()) {
1019 RETURN_IF_NOT_OK(FilterMetadata(¤t_row));
1020 RETURN_IF_NOT_OK(CheckExceptions(current_row));
1021 #ifndef ENABLE_SECURITY
1022 DetectPerBatchTime(&batch_record_start, &batch_record_end);
1023 #endif
1024
1025 if (create_data_info_queue_) {
1026 DATA_INFO data_info;
1027 (void)std::transform(current_row.begin(), current_row.end(), std::back_inserter(data_info),
1028 [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
1029 RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
1030 }
1031
1032 PrintBeginInfoWhenFirstBatch(first_push_flag_);
1033 RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_row)));
1034 PrintEndInfoWhenFirstBatch(&first_push_flag_);
1035 #ifndef ENABLE_SECURITY
1036 batch_record_start = ProfilingTime::GetCurMilliSecond();
1037 #endif
1038 if (NoExceptionRaised()) {
1039 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
1040 } else {
1041 is_break_loop = true;
1042 }
1043 }
1044 UpdateRepeatAndEpochCounter();
1045 if (current_row.eoe()) {
1046 MS_LOG(INFO) << "EOE Detected";
1047 TensorRow eoe_flag(TensorRow::kFlagEOE);
1048 RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(eoe_flag)));
1049 }
1050
1051 if (NoExceptionRaised()) {
1052 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(¤t_row));
1053 } else {
1054 is_break_loop = true;
1055 }
1056 }
1057
1058 for (uint32_t index = 0; index < num_workers_; index++) {
1059 MS_LOG(INFO) << "Adding quit flag to Workers";
1060 TensorRow quit_flag(TensorRow::kFlagQuit);
1061 RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit_flag)));
1062 }
1063
1064 MS_LOG(INFO) << "Device queue received number of batches and EOEs: " << (num_buf - num_workers_);
1065 #else
1066 MS_LOG(WARNING) << "Gpu queue is not supported in ut tests.";
1067 #endif
1068 return Status::OK();
1069 }
1070
MallocForGPUData(std::vector<device::DataQueueItem> * items,const TensorRow & curr_row,const int32_t & worker_id)1071 Status DataQueueOp::MallocForGPUData(std::vector<device::DataQueueItem> *items, const TensorRow &curr_row,
1072 const int32_t &worker_id) {
1073 size_t i = 0;
1074 for (auto &sub_item : *items) {
1075 auto rc = pool_[static_cast<size_t>(worker_id)]->Allocate(sub_item.data_len, &sub_item.data_ptr);
1076 if (rc.IsError() || sub_item.data_ptr == nullptr) {
1077 RETURN_STATUS_OOM("Memory malloc failed, check memory usage.");
1078 }
1079 if (curr_row[i] == nullptr) {
1080 MS_LOG(ERROR) << "[Internal ERROR] The pointer curr_row[" << i << "] is null";
1081 RETURN_STATUS_UNEXPECTED("[Internal ERROR] TensorRow 'curr_row' contains nullptr.");
1082 }
1083 sub_item.data_type = curr_row[i]->type().ToString();
1084 const unsigned char *column_data = curr_row[i]->GetBuffer();
1085 if (memcpy_s(sub_item.data_ptr, sub_item.data_len, column_data,
1086 static_cast<uint32_t>(curr_row[i++]->SizeInBytes())) != 0) {
1087 MS_LOG(ERROR) << "[Internal ERROR] memcpy_s failed.";
1088 RETURN_STATUS_UNEXPECTED("[Internal ERROR] memcpy_s failed.");
1089 }
1090 }
1091
1092 return Status::OK();
1093 }
1094
ClearDevice()1095 Status DataQueueOp::ClearDevice() {
1096 #ifdef WITH_BACKEND
1097 MS_LOG(INFO) << "Clearing the data in GPU device: " << device_id_ << " channel: " << channel_name_;
1098 auto release_function = std::bind(&DataQueueOp::ReleaseData, this, std::placeholders::_1, std::placeholders::_2);
1099 auto ret = device::DataQueueMgr::GetInstance().Open(channel_name_, release_function);
1100 if (ret != DataQueueStatus::SUCCESS) {
1101 RETURN_STATUS_UNEXPECTED("[Internal ERROR] Failed to open channel for clearing the device.");
1102 }
1103
1104 ret = device::DataQueueMgr::GetInstance().Clear(channel_name_);
1105 CHECK_FAIL_RETURN_UNEXPECTED(ret == DataQueueStatus::SUCCESS, "Failed to clear the device.");
1106 device::DataQueueMgr::GetInstance().Close(channel_name_);
1107 device::DataQueueMgr::GetInstance().CloseConfirm();
1108 #endif
1109 return Status::OK();
1110 }
1111
SendDataToCPU()1112 Status DataQueueOp::SendDataToCPU() {
1113 MS_LOG(INFO) << "Device queue, sending data to CPU.";
1114 int64_t total_batch = 0;
1115
1116 while (!(child_iterator_->EofHandled())) {
1117 TensorRow curr_row;
1118 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
1119
1120 if (!first_fetch_flag_) {
1121 first_fetch_flag_ = true;
1122 }
1123 if (!curr_row.empty()) {
1124 for (auto &tensor : curr_row) {
1125 MS_LOG(DEBUG) << "Feature size is " << tensor->SizeInBytes() << ".";
1126 }
1127 total_batch++;
1128 if (stop_send_) {
1129 break;
1130 }
1131 }
1132 }
1133
1134 MS_LOG(INFO) << "Device queue total batch is " << total_batch << ".";
1135
1136 return Status::OK();
1137 }
1138
Print(std::ostream & out,bool show_all) const1139 void DataQueueOp::Print(std::ostream &out, bool show_all) const {
1140 if (!show_all) {
1141 // Call the super class for displaying any common 1-liner info
1142 PipelineOp::Print(out, show_all);
1143 // Then show any custom derived-internal 1-liner info for this op
1144 out << "\n";
1145 } else {
1146 // Call the super class for displaying any common detailed info
1147 PipelineOp::Print(out, show_all);
1148 // Then show any custom derived-internal stuff
1149 out << "\nChannel name: " << channel_name_ << "\n\n";
1150 }
1151 }
1152
1153 #ifndef ENABLE_SECURITY
ProfilingRecorder(bool is_profiling_enable,const std::shared_ptr<DeviceQueueTracing> & profiling_node,int64_t send_batch,int32_t tdt_cost,uint64_t * batch_start_time,uint64_t * end_time,int32_t connector_capacity,int32_t connector_size) const1154 void DataQueueOp::ProfilingRecorder(bool is_profiling_enable, const std::shared_ptr<DeviceQueueTracing> &profiling_node,
1155 int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time,
1156 uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) const {
1157 // Record the pipeline profiling info
1158 if (is_profiling_enable) {
1159 *end_time = ProfilingTime::GetCurMilliSecond();
1160 // record push tdt time
1161 profiling_node->Record(TIME, TDT_PUSH_TIME, send_batch + 1, tdt_cost, *end_time);
1162 int32_t batch_cost = (int32_t)(*end_time - *batch_start_time);
1163 // record batch time
1164 profiling_node->Record(TIME, BATCH_TIME, send_batch + 1, batch_cost, *end_time);
1165 // record pipeline time
1166 profiling_node->Record(TIME, PIPELINE_TIME, send_batch + 1, batch_cost - tdt_cost, *end_time);
1167 *batch_start_time = *end_time;
1168 // record connector depth
1169 profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, send_batch + 1, connector_size, *end_time);
1170 }
1171 }
1172
DetectFirstBatch()1173 Status DataQueueOp::DetectFirstBatch() {
1174 TaskManager::FindMe()->Post();
1175 uint8_t count_num = 0;
1176 uint64_t temp_start_time = ProfilingTime::GetCurMilliSecond();
1177 constexpr int check_interval = 200;
1178 while (true) {
1179 RETURN_IF_INTERRUPTED();
1180 std::this_thread::sleep_for(std::chrono::milliseconds(check_interval));
1181 uint64_t temp_end_time = ProfilingTime::GetCurMilliSecond();
1182 // if fetch first batch, or detect 3 or more times and unable fetch first batch, exist with already printed Warning
1183 if (first_fetch_flag_ == true || count_num > 2) {
1184 break;
1185 } else if (temp_end_time - temp_start_time > kTimeOutMilliSeconds) {
1186 count_num++;
1187 MS_LOG(WARNING) << "Bad performance attention, it waits more than " +
1188 std::to_string(kTimeOutMilliSeconds / 1000) +
1189 " seconds and unable to fetch first Batch of "
1190 "data from dataset pipeline, which might result `GetNext` timeout problem. You may test "
1191 "dataset processing performance (with creating dataset iterator) and optimize it. Notes: "
1192 "shuffle operation is turn on for loading Dataset in default, which may effect first batch "
1193 "loading time.";
1194 }
1195 }
1196 return Status::OK();
1197 }
1198
DetectPerBatchTime(const uint64_t * start_time,uint64_t * end_time)1199 void DataQueueOp::DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time) {
1200 *end_time = ProfilingTime::GetCurMilliSecond();
1201 auto interval = *end_time - *start_time;
1202 constexpr auto kTimeMilliSeconds = 1000.;
1203 send_summary_.back().record_data(interval / kTimeMilliSeconds);
1204 if (interval > kTimeOutMilliSeconds) {
1205 MS_LOG(WARNING) << "Bad performance attention, it takes more than " + std::to_string(kTimeOutMilliSeconds / 1000) +
1206 " seconds to fetch a batch of data from dataset "
1207 "pipeline, which might result `GetNext` timeout problem. You may test dataset processing"
1208 " performance(with creating dataset iterator) and optimize it.";
1209 }
1210 }
1211 #endif
1212
PrintBeginInfoWhenFirstBatch(const bool & first_push_flag) const1213 void DataQueueOp::PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) const {
1214 if (first_push_flag != true) {
1215 MS_LOG(INFO) << "Loading dataset and begin to push first batch into device ...";
1216 }
1217 }
1218
PrintEndInfoWhenFirstBatch(bool * first_push_flag) const1219 void DataQueueOp::PrintEndInfoWhenFirstBatch(bool *first_push_flag) const {
1220 if (!first_push_flag) {
1221 MS_LOG(WARNING) << "First batch flag: first_push_flag is nullptr";
1222 return;
1223 }
1224 if (*first_push_flag != true) {
1225 MS_LOG(INFO) << "Loading dataset and push first batch into device successful.";
1226 *first_push_flag = true;
1227 }
1228 }
1229
RetryPushData(const std::vector<DataQueueItem> & items,const bool profiling,uint64_t * push_time)1230 Status DataQueueOp::RetryPushData(const std::vector<DataQueueItem> &items, const bool profiling, uint64_t *push_time) {
1231 #ifdef WITH_BACKEND
1232 bool flag_log = false;
1233 #ifndef ENABLE_SECURITY
1234 uint64_t start_time = 0;
1235 if (profiling) {
1236 start_time = ProfilingTime::GetCurMilliSecond();
1237 }
1238 #endif
1239 while (!device::DataQueueMgr::GetInstance().IsClosed() && !TaskManager::FindMe()->Interrupted()) {
1240 DataQueueStatus ret = device::DataQueueMgr::GetInstance().Push(channel_name_, items, WAIT_TIME);
1241 if (ret != DataQueueStatus::SUCCESS) {
1242 if (ret == DataQueueStatus::ERROR_INPUT) {
1243 device::DataQueueMgr::GetInstance().Close(channel_name_);
1244 device::DataQueueMgr::GetInstance().CloseConfirm();
1245 return Status(
1246 StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
1247 "Invalid data, the types or shapes of current row is different with previous row(i.e. do batch operation but "
1248 "drop_reminder is False, or without resize image into the same size, these will cause shapes differs).");
1249 } else {
1250 if (!stop_send_) {
1251 if (!flag_log) {
1252 MS_LOG(DEBUG) << "Retry pushing data...";
1253 flag_log = true;
1254 }
1255 continue;
1256 }
1257 break;
1258 }
1259 } else {
1260 break;
1261 }
1262 }
1263 #ifndef ENABLE_SECURITY
1264 if (profiling) {
1265 *push_time = ProfilingTime::GetCurMilliSecond() - start_time;
1266 }
1267 #endif
1268 #endif
1269 return Status::OK();
1270 }
1271
SendDataToAscendDynamic()1272 Status DataQueueOp::SendDataToAscendDynamic() {
1273 #ifdef WITH_BACKEND
1274 MS_LOG(DEBUG) << "Dynamic Data queue, sending data to Ascend.";
1275 int64_t send_batch = 0;
1276 uint64_t data_queue_cost = 0;
1277 bool is_break_loop = false;
1278
1279 RETURN_IF_NOT_OK(CreateDynamicDataQueue());
1280 std::function<void(void *, int32_t)> release_function([](void *, int32_t) { return; });
1281 auto ret = device::DataQueueMgr::GetInstance().Open(channel_name_, release_function);
1282 if (ret != DataQueueStatus::SUCCESS) {
1283 return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
1284 "[Internal ERROR] Failed to open channel for sending data.");
1285 }
1286
1287 TensorRow curr_row;
1288 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
1289 first_fetch_flag_ = true;
1290
1291 while (!curr_row.eof() && !is_break_loop) {
1292 while (!curr_row.eoe() && !is_break_loop) {
1293 RETURN_IF_NOT_OK(FilterMetadata(&curr_row));
1294 RETURN_IF_NOT_OK(CheckExceptions(curr_row));
1295 std::vector<device::DataQueueItem> items = ConvertTensorRowToDataQueueItem(curr_row);
1296 RETURN_IF_NOT_OK(RetryPushData(items, false, &data_queue_cost));
1297 if (create_data_info_queue_) {
1298 DATA_INFO data_info;
1299 (void)std::transform(curr_row.begin(), curr_row.end(), std::back_inserter(data_info),
1300 [](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
1301 RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
1302 }
1303 send_batch++;
1304 if (total_batch_ > 0 && send_batch >= total_batch_) {
1305 is_break_loop = true;
1306 break;
1307 }
1308
1309 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
1310 }
1311
1312 if (curr_row.eoe() && send_epoch_end_) {
1313 MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
1314 stop_send_ = true;
1315 }
1316
1317 RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
1318 }
1319
1320 // now we use this flag to judge whether exception raised.
1321 if (stop_send_ || !TaskManager::FindMe()->Interrupted()) {
1322 send_finished_ = true;
1323 }
1324 tree_->SetFinished();
1325 MS_LOG(INFO) << "ExecutionTree finished. Device queue sent number of batches: " << send_batch;
1326
1327 device::DataQueueMgr::GetInstance().Close(channel_name_);
1328 device::DataQueueMgr::GetInstance().CloseConfirm();
1329 #endif
1330 return Status::OK();
1331 }
1332 } // namespace dataset
1333 } // namespace mindspore
1334