1 /** 2 * Copyright 2019-2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_ 18 19 #include <deque> 20 #include <memory> 21 #include <string> 22 #include <utility> 23 #include <vector> 24 25 #include "minddata/dataset/engine/datasetops/pipeline_op.h" 26 #include "minddata/dataset/engine/datasetops/repeat_op.h" 27 #include "minddata/dataset/engine/dataset_iterator.h" 28 29 #include "minddata/dataset/engine/perf/device_queue_tracing.h" 30 #include "minddata/dataset/util/status.h" 31 #ifdef ENABLE_DUMP_IR 32 #include "minddata/dataset/util/rdr.h" 33 #endif 34 #include "minddata/dataset/util/queue.h" 35 #include "minddata/dataset/util/circular_pool.h" 36 #include "mindspore/ccsrc/include/backend/data_queue/data_queue.h" 37 38 namespace mindspore { 39 namespace dataset { 40 class GpuConnector; 41 using DATA_INFO = std::vector<std::pair<DataType, TensorShape>>; 42 using DATA_INFO_QUEUE = Queue<DATA_INFO>; 43 using mindspore::device::DataQueueItem; 44 using mindspore::device::DataQueueStatus; 45 constexpr int32_t kTimeOutMilliSeconds = 60000; 46 const int kDataInfoQueueCapacity = 128; 47 48 class DataQueueOp : public PipelineOp { 49 public: 50 static const uint32_t INVALID_HANDLE = 0xffffffffUL; 51 const uint32_t WAIT_TIME = 5; 52 53 enum class DeviceType { Ascend = 0, GPU = 1, CPU = 2 }; 54 55 // Name: constructor 56 // Description 57 DataQueueOp(const std::string channel_name, DeviceType device_type, int32_t device_id, bool send_epoch_end, 58 int32_t total_batch, bool create_data_info_queue); 59 60 // Name: destructor 61 // Description 62 ~DataQueueOp(); 63 64 /// \brief Getter function 65 /// \return connector size of current op ConnectorSize()66 int32_t ConnectorSize() const { return ChildOpConnectorSize(); } 67 68 Status EoeReceived(int32_t worker_id) override; 69 StopSend()70 void StopSend() { 71 MS_LOG(INFO) << "Received signal to stop sending data to device."; 72 stop_send_ = true; 73 send_finished_ = true; 74 } 75 ContinueSend()76 void ContinueSend() { 77 MS_LOG(INFO) << "continue send at the beginning of the epoch"; 78 stop_send_ = false; 79 } 80 StopWaiting()81 void StopWaiting() { ascend_keep_waiting_ = false; } 82 83 Status ClearDevice(); 84 85 Status GetDataInfo(DATA_INFO *data_info); 86 87 Status GetMbufQueueSize(size_t *queue_size); 88 89 std::vector<std::vector<double>> GetSendInfo(); 90 91 // Name: Print() 92 // Description: A function that prints info about the node 93 void Print(std::ostream &out, // In: The output stream to print to 94 bool show_all) const override; // In: T/F if it should print everything 95 96 // Provide stream operator for displaying it 97 friend std::ostream &operator<<(std::ostream &out, const DataQueueOp &to) { 98 to.Print(out, false); 99 return out; 100 } 101 102 Status operator()() override; 103 #ifndef ENABLE_SECURITY 104 // Record the pipeline profiling info 105 void ProfilingRecorder(bool is_profiling_enable, const std::shared_ptr<DeviceQueueTracing> &profiling_node, 106 int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time, uint64_t *end_time, 107 int32_t connector_capacity, int32_t connector_size) const; 108 109 #endif 110 // Op name getter 111 // @return Name of the current Op Name()112 std::string Name() const override { return kDeviceQueueOp; } 113 114 private: 115 // Name: FilterMetadata(TensorRow *); 116 // Description: Auto filter metadata column before sending to device. 117 Status FilterMetadata(TensorRow *row) const; 118 119 // Name: CheckExceptions(TensorRow); 120 // Description: Check whether the TensorRow meets the condition for performing DataQueueOp 121 Status CheckExceptions(const TensorRow &row) const; 122 123 // Name: PrintBeginInfoWhenFirstBatch(bool) 124 // Description: Print info when first batch begin to send in sink_mode 125 void PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) const; 126 127 // Name: PrintEndInfoWhenFirstBatch(bool) 128 // Description: Print info when first batch send successful in sink_mode 129 void PrintEndInfoWhenFirstBatch(bool *first_push_flag) const; 130 Status RetryPushData(const std::vector<DataQueueItem> &data, bool profiling, uint64_t *push_time); 131 bool NoExceptionRaised() const; 132 Status SendDataToAscendDynamic(); 133 134 void WaitContinueSignal() const; 135 Status SendDataToAscend(); 136 Status SendEpochEndToAscend(const TensorRow &curr_row, const bool &is_profiling_enable, int32_t *tdt_cost, 137 bool *is_break_loop); 138 // Queue control logic for mbuf in host, to prevent from hang/exit abnormally 139 Status WaitForAscendQueue(size_t batch_data_len); 140 141 // After multi-stage pipeline cache prefetch is enabled, data is not directly pushed to the device queue but to the 142 // cache queue first, and then pushed to the device queue after the cache analysis is complete. Push prefetch cache 143 // data to Ascend device queue. 144 Status PushPrefetchDataToAscend(); 145 // Push origin data to Ascend cache queue. 146 Status PushDataToAscendCacheQueue(const TensorRow &curr_row); 147 // Push prefetch cache data to GPU device queue. 148 Status PushPrefetchDataToGPU(); 149 // Push origin data to GPU cache queue. 150 Status PushDataToGPUCacheQueue(std::vector<device::DataQueueItem> &&data_items); 151 152 void LimitSendingBatches(int64_t send_batch, int64_t *sending_num, const std::shared_ptr<ConfigManager> &cfg) const; 153 Status SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost); 154 // check status that push data into device 155 Status CheckPushStatus(DataQueueStatus status, bool stop_send, bool *send_finished, bool *is_break_loop); 156 bool ascend_keep_waiting_; 157 158 Status SendDataToGPU(); 159 Status MallocForGPUData(std::vector<device::DataQueueItem> *items, const TensorRow &curr_row, 160 const int32_t &worker_id); 161 void ReleaseData(void *addr, int32_t worker_id); 162 Status LaunchParallelCopyThread(); 163 Status PushDataToGPU(); 164 Status WorkerEntry(int32_t worker_id); 165 Status SetThreadDevice(); 166 Status CreateDynamicDataQueue(); 167 double CalMbufQueueMemory(size_t realtime_queue_size); 168 void RecordProfilingData(bool is_profiling_enable, bool end_of_epoch, int32_t *connector_size, 169 int32_t *connector_capacity, const int64_t *send_batch) const; 170 171 QueueList<TensorRow> receive_queues_; 172 std::vector<std::shared_ptr<MemoryPool>> pool_; 173 std::unique_ptr<GpuConnector> gpu_connector_; 174 const uint32_t kDeviceQueGpuNumThreads = 2; 175 const uint32_t kDeviceQueGpuQueueCapacity = 8; 176 const int32_t kDeviceQueGpuThreadMemory = 1024; 177 const uint32_t kDynamicHostQueueCapacity = 2; 178 uint32_t num_workers_; 179 uint32_t queue_capacity_; 180 181 Status SendDataToCPU(); 182 #ifndef ENABLE_SECURITY 183 // Create async thread to detect whether it takes too long and unable to fetch first batch 184 Status DetectFirstBatch(); 185 186 // Detect the cost time of each batch, present alarm message if cost too long 187 void DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time); 188 189 // Send information in sink mode 190 struct SendInfo { 191 double epoch = 0; 192 double fetch_data_num = 0; 193 double fetch_data_time = 0; 194 double first_data_time = 0; 195 bool init = false; record_dataSendInfo196 void record_data(double t) { 197 if (!init) { 198 first_data_time = t; 199 init = true; 200 } 201 fetch_data_time += t; 202 fetch_data_num += 1; 203 } 204 }; 205 std::vector<SendInfo> send_summary_; 206 #endif 207 208 std::unique_ptr<ChildIterator> child_iterator_; 209 std::string channel_name_; 210 DeviceType device_type_; 211 const int32_t device_id_; 212 const bool send_epoch_end_; 213 bool stop_send_; 214 bool send_finished_; 215 int32_t total_batch_; 216 bool create_data_info_queue_; 217 std::unique_ptr<DATA_INFO_QUEUE> data_info_queue_ptr_; 218 std::atomic<bool> first_fetch_flag_; 219 std::mutex data_info_mutex_; 220 bool first_push_flag_; // default: false, when first push, it will be true 221 bool dynamic_shape_{false}; 222 std::deque<double> memory_per_batch_; 223 std::shared_ptr<device::DataQueue> ascend_data_queue_; 224 225 #ifdef ENABLE_DUMP_IR 226 std::shared_ptr<MDChannelInfo> md_channel_info_; 227 #endif 228 229 // Whether to enable the cache prefetch multilevel pipeline, used in ps cache mode. 230 bool enable_prefetch_cache_pipeline_{false}; 231 }; 232 } // namespace dataset 233 } // namespace mindspore 234 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_ 235