1 /** 2 * Copyright 2019-2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_ 18 19 #include <memory> 20 #include <string> 21 #include <utility> 22 #include <vector> 23 24 #include "minddata/dataset/engine/datasetops/pipeline_op.h" 25 #include "minddata/dataset/engine/datasetops/repeat_op.h" 26 #include "minddata/dataset/engine/dataset_iterator.h" 27 28 #include "minddata/dataset/engine/perf/device_queue_tracing.h" 29 #include "minddata/dataset/util/status.h" 30 #ifdef ENABLE_DUMP_IR 31 #include "debug/rdr/running_data_recorder.h" 32 #include "minddata/dataset/util/rdr.h" 33 #endif 34 35 #ifdef ENABLE_TDTQUE 36 #include "minddata/dataset/util/queue.h" 37 #include "minddata/dataset/engine/tdt/tdt_plugin.h" 38 #endif 39 40 #ifdef ENABLE_GPUQUE 41 #include "minddata/dataset/engine/gpu_item_connector.h" 42 #include "minddata/dataset/util/circular_pool.h" 43 #include "runtime/device/gpu/gpu_buffer_mgr.h" 44 #include "ps/ps_cache/ps_data/ps_data_prefetch.h" 45 using mindspore::device::BlockQueueStatus_T; 46 using mindspore::device::GpuBufferMgr; 47 #endif 48 49 namespace mindspore { 50 namespace dataset { 51 using DATA_INFO = std::vector<std::pair<DataType, TensorShape>>; 52 using DATA_INFO_QUEUE = Queue<DATA_INFO>; 53 54 constexpr int32_t kTimeOutMilliSeconds = 25000; 55 const int kDataInfoQueueCapacity = 128; 56 57 class DeviceQueueOp : public PipelineOp { 58 public: 59 static const uint32_t INVALID_HANDLE = 0xffffffffUL; 60 static const uint32_t WAIT_TIME = 5; 61 62 enum class DeviceType { Ascend = 0, GPU = 1, CPU = 2 }; 63 64 // Name: constructor 65 // Description 66 DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size, 67 bool send_epoch_end, int32_t total_batch, bool create_data_info_queue); 68 69 // Name: destructor 70 // Description 71 ~DeviceQueueOp(); 72 73 /// \brief Getter function 74 /// \return connector size of current op ConnectorSize()75 int32_t ConnectorSize() const { return ChildOpConnectorSize(); } 76 77 Status EoeReceived(int32_t worker_id) override; 78 GetPrefetchSize()79 const int32_t GetPrefetchSize() { return prefetch_size_; } 80 StopSend()81 void StopSend() { stop_send_ = true; } 82 ContinueSend()83 void ContinueSend() { 84 MS_LOG(INFO) << "continue send at the beginning of the epoch"; 85 stop_send_ = false; 86 } 87 88 #ifdef ENABLE_TDTQUE StopWaiting()89 void StopWaiting() { ascend_keep_waiting_ = false; } 90 #endif 91 92 Status GetDataInfo(DATA_INFO *data_info); 93 94 // Name: Print() 95 // Description: A function that prints info about the node 96 void Print(std::ostream &out, // In: The output stream to print to 97 bool show_all) const override; // In: T/F if it should print everything 98 99 // Provide stream operator for displaying it 100 friend std::ostream &operator<<(std::ostream &out, const DeviceQueueOp &to) { 101 to.Print(out, false); 102 return out; 103 } 104 105 Status operator()() override; 106 #ifndef ENABLE_SECURITY 107 // Record the pipeline profiling info 108 void ProfilingRecorder(bool is_profiling_enable, std::shared_ptr<DeviceQueueTracing> profiling_node, 109 int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time, uint64_t *end_time, 110 int32_t connector_capacity, int32_t connector_size); 111 112 #endif 113 // Op name getter 114 // @return Name of the current Op Name()115 std::string Name() const override { return kDeviceQueueOp; } 116 117 private: 118 // Name: FilterMetadata(TensorRow *); 119 // Description: Auto filter metadata column before sending to device. 120 Status FilterMetadata(TensorRow *row); 121 122 // Name: CheckExceptions(TensorRow); 123 // Description: Check whether the TensorRow meets the condition for performing DeviceQueueOp 124 Status CheckExceptions(const TensorRow &row) const; 125 126 // Name: PrintBeginInfoWhenFirstBatch(bool) 127 // Description: Print info when first batch begin to send in sink_mode 128 void PrintBeginInfoWhenFirstBatch(const bool &first_push_flag); 129 130 // Name: PrintEndInfoWhenFirstBatch(bool) 131 // Description: Print info when first batch send successful in sink_mode 132 void PrintEndInfoWhenFirstBatch(bool *first_push_flag); 133 134 private: 135 #ifdef ENABLE_TDTQUE 136 void WaitContinueSignal() const; 137 Status SendDataToAscend(); 138 void LimitSendingBatches(int64_t send_batch, int64_t *sending_num, std::shared_ptr<ConfigManager> cfg); 139 Status SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost); 140 bool ascend_keep_waiting_; 141 #endif 142 143 #ifdef ENABLE_GPUQUE 144 Status SendDataToGPU(); 145 Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row, const int32_t &worker_id); 146 Status RetryPushData(unsigned int handle, const std::vector<DataItemGpu> &data); 147 void ReleaseData(void *addr, int32_t worker_id); 148 Status LaunchParallelCopyThread(); 149 Status PushDataToGPU(); 150 Status WorkerEntry(int32_t worker_id); 151 Status SetThreadDevice(); 152 153 QueueList<TensorRow> receive_queues_; 154 std::vector<std::shared_ptr<MemoryPool>> pool_; 155 std::unique_ptr<GpuItemConnector> gpu_item_connector_; 156 const uint32_t kDeviceQueGpuNumThreads = 2; 157 const uint32_t kDeviceQueGpuQueueCapacity = 8; 158 const uint32_t kDeviceQueGpuThreadMemory = 1024; 159 uint32_t num_workers_; 160 uint32_t queue_capacity_; 161 // This rank_id is for device_queue, one process work with only one rank_id, 162 // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', 163 // but for distribute scenario, this rank_id come from _get_global_rank() in python 164 uint32_t rank_id_; 165 #endif 166 167 Status SendDataToCPU(); 168 #ifndef ENABLE_SECURITY 169 // Create async thread to detect whether it takes too long and unable to fetch first batch 170 Status DetectFirstBatch(); 171 172 // Detect the cost time of each batch, present alarm message if cost too long 173 void DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time); 174 #endif 175 176 std::unique_ptr<ChildIterator> child_iterator_; 177 std::string channel_name_; 178 DeviceType device_type_; 179 const int32_t device_id_; 180 const int32_t prefetch_size_; 181 const bool send_epoch_end_; 182 bool stop_send_; 183 bool send_finished_; 184 int32_t total_batch_; 185 bool create_data_info_queue_; 186 std::unique_ptr<DATA_INFO_QUEUE> data_info_queue_ptr_; 187 std::atomic<bool> first_fetch_flag_; 188 std::mutex data_info_mutex_; 189 bool first_push_flag_; // default: false, when first push, it will be true 190 191 #ifdef ENABLE_TDTQUE 192 std::shared_ptr<TdtPlugin> tdtInstancePtr; 193 #endif 194 #ifdef ENABLE_DUMP_IR 195 std::shared_ptr<MDChannelInfo> md_channel_info_; 196 #endif 197 }; 198 } // namespace dataset 199 } // namespace mindspore 200 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_ 201