• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_
18 
19 #include <memory>
20 #include <string>
21 #include <utility>
22 #include <vector>
23 
24 #include "minddata/dataset/engine/datasetops/pipeline_op.h"
25 #include "minddata/dataset/engine/datasetops/repeat_op.h"
26 #include "minddata/dataset/engine/dataset_iterator.h"
27 
28 #include "minddata/dataset/engine/perf/device_queue_tracing.h"
29 #include "minddata/dataset/util/status.h"
30 #ifdef ENABLE_DUMP_IR
31 #include "debug/rdr/running_data_recorder.h"
32 #include "minddata/dataset/util/rdr.h"
33 #endif
34 
35 #ifdef ENABLE_TDTQUE
36 #include "minddata/dataset/util/queue.h"
37 #include "minddata/dataset/engine/tdt/tdt_plugin.h"
38 #endif
39 
40 #ifdef ENABLE_GPUQUE
41 #include "minddata/dataset/engine/gpu_item_connector.h"
42 #include "minddata/dataset/util/circular_pool.h"
43 #include "runtime/device/gpu/gpu_buffer_mgr.h"
44 #include "ps/ps_cache/ps_data/ps_data_prefetch.h"
45 using mindspore::device::BlockQueueStatus_T;
46 using mindspore::device::GpuBufferMgr;
47 #endif
48 
49 namespace mindspore {
50 namespace dataset {
51 using DATA_INFO = std::vector<std::pair<DataType, TensorShape>>;
52 using DATA_INFO_QUEUE = Queue<DATA_INFO>;
53 
54 constexpr int32_t kTimeOutMilliSeconds = 25000;
55 const int kDataInfoQueueCapacity = 128;
56 
57 class DeviceQueueOp : public PipelineOp {
58  public:
59   static const uint32_t INVALID_HANDLE = 0xffffffffUL;
60   static const uint32_t WAIT_TIME = 5;
61 
62   enum class DeviceType { Ascend = 0, GPU = 1, CPU = 2 };
63 
64   //  Name: constructor
65   //  Description
66   DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
67                 bool send_epoch_end, int32_t total_batch, bool create_data_info_queue);
68 
69   //  Name: destructor
70   //  Description
71   ~DeviceQueueOp();
72 
73   /// \brief Getter function
74   /// \return connector size of current op
ConnectorSize()75   int32_t ConnectorSize() const { return ChildOpConnectorSize(); }
76 
77   Status EoeReceived(int32_t worker_id) override;
78 
GetPrefetchSize()79   const int32_t GetPrefetchSize() { return prefetch_size_; }
80 
StopSend()81   void StopSend() { stop_send_ = true; }
82 
ContinueSend()83   void ContinueSend() {
84     MS_LOG(INFO) << "continue send at the beginning of the epoch";
85     stop_send_ = false;
86   }
87 
88 #ifdef ENABLE_TDTQUE
StopWaiting()89   void StopWaiting() { ascend_keep_waiting_ = false; }
90 #endif
91 
92   Status GetDataInfo(DATA_INFO *data_info);
93 
94   // Name: Print()
95   // Description: A function that prints info about the node
96   void Print(std::ostream &out,              // In: The output stream to print to
97              bool show_all) const override;  // In: T/F if it should print everything
98 
99   // Provide stream operator for displaying it
100   friend std::ostream &operator<<(std::ostream &out, const DeviceQueueOp &to) {
101     to.Print(out, false);
102     return out;
103   }
104 
105   Status operator()() override;
106 #ifndef ENABLE_SECURITY
107   // Record the pipeline profiling info
108   void ProfilingRecorder(bool is_profiling_enable, std::shared_ptr<DeviceQueueTracing> profiling_node,
109                          int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time, uint64_t *end_time,
110                          int32_t connector_capacity, int32_t connector_size);
111 
112 #endif
113   // Op name getter
114   // @return Name of the current Op
Name()115   std::string Name() const override { return kDeviceQueueOp; }
116 
117  private:
118   // Name: FilterMetadata(TensorRow *);
119   // Description: Auto filter metadata column before sending to device.
120   Status FilterMetadata(TensorRow *row);
121 
122   // Name: CheckExceptions(TensorRow);
123   // Description: Check whether the TensorRow meets the condition for performing DeviceQueueOp
124   Status CheckExceptions(const TensorRow &row) const;
125 
126   // Name: PrintBeginInfoWhenFirstBatch(bool)
127   // Description: Print info when first batch begin to send in sink_mode
128   void PrintBeginInfoWhenFirstBatch(const bool &first_push_flag);
129 
130   // Name: PrintEndInfoWhenFirstBatch(bool)
131   // Description: Print info when first batch send successful in sink_mode
132   void PrintEndInfoWhenFirstBatch(bool *first_push_flag);
133 
134  private:
135 #ifdef ENABLE_TDTQUE
136   void WaitContinueSignal() const;
137   Status SendDataToAscend();
138   void LimitSendingBatches(int64_t send_batch, int64_t *sending_num, std::shared_ptr<ConfigManager> cfg);
139   Status SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost);
140   bool ascend_keep_waiting_;
141 #endif
142 
143 #ifdef ENABLE_GPUQUE
144   Status SendDataToGPU();
145   Status MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row, const int32_t &worker_id);
146   Status RetryPushData(unsigned int handle, const std::vector<DataItemGpu> &data);
147   void ReleaseData(void *addr, int32_t worker_id);
148   Status LaunchParallelCopyThread();
149   Status PushDataToGPU();
150   Status WorkerEntry(int32_t worker_id);
151   Status SetThreadDevice();
152 
153   QueueList<TensorRow> receive_queues_;
154   std::vector<std::shared_ptr<MemoryPool>> pool_;
155   std::unique_ptr<GpuItemConnector> gpu_item_connector_;
156   const uint32_t kDeviceQueGpuNumThreads = 2;
157   const uint32_t kDeviceQueGpuQueueCapacity = 8;
158   const uint32_t kDeviceQueGpuThreadMemory = 1024;
159   uint32_t num_workers_;
160   uint32_t queue_capacity_;
161   // This rank_id is for device_queue, one process work with only one rank_id,
162   // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
163   // but for distribute scenario, this rank_id come from _get_global_rank() in python
164   uint32_t rank_id_;
165 #endif
166 
167   Status SendDataToCPU();
168 #ifndef ENABLE_SECURITY
169   // Create async thread to detect whether it takes too long and unable to fetch first batch
170   Status DetectFirstBatch();
171 
172   // Detect the cost time of each batch, present alarm message if cost too long
173   void DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time);
174 #endif
175 
176   std::unique_ptr<ChildIterator> child_iterator_;
177   std::string channel_name_;
178   DeviceType device_type_;
179   const int32_t device_id_;
180   const int32_t prefetch_size_;
181   const bool send_epoch_end_;
182   bool stop_send_;
183   bool send_finished_;
184   int32_t total_batch_;
185   bool create_data_info_queue_;
186   std::unique_ptr<DATA_INFO_QUEUE> data_info_queue_ptr_;
187   std::atomic<bool> first_fetch_flag_;
188   std::mutex data_info_mutex_;
189   bool first_push_flag_;  // default: false, when first push, it will be true
190 
191 #ifdef ENABLE_TDTQUE
192   std::shared_ptr<TdtPlugin> tdtInstancePtr;
193 #endif
194 #ifdef ENABLE_DUMP_IR
195   std::shared_ptr<MDChannelInfo> md_channel_info_;
196 #endif
197 };
198 }  // namespace dataset
199 }  // namespace mindspore
200 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DEVICE_QUEUE_OP_H_
201