• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_
18 
19 #include <deque>
20 #include <memory>
21 #include <string>
22 #include <utility>
23 #include <vector>
24 
25 #include "minddata/dataset/engine/datasetops/pipeline_op.h"
26 #include "minddata/dataset/engine/datasetops/repeat_op.h"
27 #include "minddata/dataset/engine/dataset_iterator.h"
28 
29 #include "minddata/dataset/engine/perf/device_queue_tracing.h"
30 #include "minddata/dataset/util/status.h"
31 #ifdef ENABLE_DUMP_IR
32 #include "minddata/dataset/util/rdr.h"
33 #endif
34 #include "minddata/dataset/util/queue.h"
35 #include "minddata/dataset/util/circular_pool.h"
36 #include "mindspore/ccsrc/include/backend/data_queue/data_queue.h"
37 
38 namespace mindspore {
39 namespace dataset {
40 class GpuConnector;
41 using DATA_INFO = std::vector<std::pair<DataType, TensorShape>>;
42 using DATA_INFO_QUEUE = Queue<DATA_INFO>;
43 using mindspore::device::DataQueueItem;
44 using mindspore::device::DataQueueStatus;
45 constexpr int32_t kTimeOutMilliSeconds = 60000;
46 const int kDataInfoQueueCapacity = 128;
47 
48 class DataQueueOp : public PipelineOp {
49  public:
50   static const uint32_t INVALID_HANDLE = 0xffffffffUL;
51   const uint32_t WAIT_TIME = 5;
52 
53   enum class DeviceType { Ascend = 0, GPU = 1, CPU = 2 };
54 
55   //  Name: constructor
56   //  Description
57   DataQueueOp(const std::string channel_name, DeviceType device_type, int32_t device_id, bool send_epoch_end,
58               int32_t total_batch, bool create_data_info_queue);
59 
60   //  Name: destructor
61   //  Description
62   ~DataQueueOp();
63 
64   /// \brief Getter function
65   /// \return connector size of current op
ConnectorSize()66   int32_t ConnectorSize() const { return ChildOpConnectorSize(); }
67 
68   Status EoeReceived(int32_t worker_id) override;
69 
StopSend()70   void StopSend() {
71     MS_LOG(INFO) << "Received signal to stop sending data to device.";
72     stop_send_ = true;
73     send_finished_ = true;
74   }
75 
ContinueSend()76   void ContinueSend() {
77     MS_LOG(INFO) << "continue send at the beginning of the epoch";
78     stop_send_ = false;
79   }
80 
StopWaiting()81   void StopWaiting() { ascend_keep_waiting_ = false; }
82 
83   Status ClearDevice();
84 
85   Status GetDataInfo(DATA_INFO *data_info);
86 
87   Status GetMbufQueueSize(size_t *queue_size);
88 
89   std::vector<std::vector<double>> GetSendInfo();
90 
91   // Name: Print()
92   // Description: A function that prints info about the node
93   void Print(std::ostream &out,              // In: The output stream to print to
94              bool show_all) const override;  // In: T/F if it should print everything
95 
96   // Provide stream operator for displaying it
97   friend std::ostream &operator<<(std::ostream &out, const DataQueueOp &to) {
98     to.Print(out, false);
99     return out;
100   }
101 
102   Status operator()() override;
103 #ifndef ENABLE_SECURITY
104   // Record the pipeline profiling info
105   void ProfilingRecorder(bool is_profiling_enable, const std::shared_ptr<DeviceQueueTracing> &profiling_node,
106                          int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time, uint64_t *end_time,
107                          int32_t connector_capacity, int32_t connector_size) const;
108 
109 #endif
110   // Op name getter
111   // @return Name of the current Op
Name()112   std::string Name() const override { return kDeviceQueueOp; }
113 
114  private:
115   // Name: FilterMetadata(TensorRow *);
116   // Description: Auto filter metadata column before sending to device.
117   Status FilterMetadata(TensorRow *row) const;
118 
119   // Name: CheckExceptions(TensorRow);
120   // Description: Check whether the TensorRow meets the condition for performing DataQueueOp
121   Status CheckExceptions(const TensorRow &row) const;
122 
123   // Name: PrintBeginInfoWhenFirstBatch(bool)
124   // Description: Print info when first batch begin to send in sink_mode
125   void PrintBeginInfoWhenFirstBatch(const bool &first_push_flag) const;
126 
127   // Name: PrintEndInfoWhenFirstBatch(bool)
128   // Description: Print info when first batch send successful in sink_mode
129   void PrintEndInfoWhenFirstBatch(bool *first_push_flag) const;
130   Status RetryPushData(const std::vector<DataQueueItem> &data, bool profiling, uint64_t *push_time);
131   bool NoExceptionRaised() const;
132   Status SendDataToAscendDynamic();
133 
134   void WaitContinueSignal() const;
135   Status SendDataToAscend();
136   Status SendEpochEndToAscend(const TensorRow &curr_row, const bool &is_profiling_enable, int32_t *tdt_cost,
137                               bool *is_break_loop);
138   // Queue control logic for mbuf in host, to prevent from hang/exit abnormally
139   Status WaitForAscendQueue(size_t batch_data_len);
140 
141   // After multi-stage pipeline cache prefetch is enabled, data is not directly pushed to the device queue but to the
142   // cache queue first, and then pushed to the device queue after the cache analysis is complete. Push prefetch cache
143   // data to Ascend device queue.
144   Status PushPrefetchDataToAscend();
145   // Push origin data to Ascend cache queue.
146   Status PushDataToAscendCacheQueue(const TensorRow &curr_row);
147   // Push prefetch cache data to GPU device queue.
148   Status PushPrefetchDataToGPU();
149   // Push origin data to GPU cache queue.
150   Status PushDataToGPUCacheQueue(std::vector<device::DataQueueItem> &&data_items);
151 
152   void LimitSendingBatches(int64_t send_batch, int64_t *sending_num, const std::shared_ptr<ConfigManager> &cfg) const;
153   Status SendRowToTdt(TensorRow curr_row, bool is_profiling_enable, int32_t *tdt_cost);
154   // check status that push data into device
155   Status CheckPushStatus(DataQueueStatus status, bool stop_send, bool *send_finished, bool *is_break_loop);
156   bool ascend_keep_waiting_;
157 
158   Status SendDataToGPU();
159   Status MallocForGPUData(std::vector<device::DataQueueItem> *items, const TensorRow &curr_row,
160                           const int32_t &worker_id);
161   void ReleaseData(void *addr, int32_t worker_id);
162   Status LaunchParallelCopyThread();
163   Status PushDataToGPU();
164   Status WorkerEntry(int32_t worker_id);
165   Status SetThreadDevice();
166   Status CreateDynamicDataQueue();
167   double CalMbufQueueMemory(size_t realtime_queue_size);
168   void RecordProfilingData(bool is_profiling_enable, bool end_of_epoch, int32_t *connector_size,
169                            int32_t *connector_capacity, const int64_t *send_batch) const;
170 
171   QueueList<TensorRow> receive_queues_;
172   std::vector<std::shared_ptr<MemoryPool>> pool_;
173   std::unique_ptr<GpuConnector> gpu_connector_;
174   const uint32_t kDeviceQueGpuNumThreads = 2;
175   const uint32_t kDeviceQueGpuQueueCapacity = 8;
176   const int32_t kDeviceQueGpuThreadMemory = 1024;
177   const uint32_t kDynamicHostQueueCapacity = 2;
178   uint32_t num_workers_;
179   uint32_t queue_capacity_;
180 
181   Status SendDataToCPU();
182 #ifndef ENABLE_SECURITY
183   // Create async thread to detect whether it takes too long and unable to fetch first batch
184   Status DetectFirstBatch();
185 
186   // Detect the cost time of each batch, present alarm message if cost too long
187   void DetectPerBatchTime(const uint64_t *start_time, uint64_t *end_time);
188 
189   // Send information in sink mode
190   struct SendInfo {
191     double epoch = 0;
192     double fetch_data_num = 0;
193     double fetch_data_time = 0;
194     double first_data_time = 0;
195     bool init = false;
record_dataSendInfo196     void record_data(double t) {
197       if (!init) {
198         first_data_time = t;
199         init = true;
200       }
201       fetch_data_time += t;
202       fetch_data_num += 1;
203     }
204   };
205   std::vector<SendInfo> send_summary_;
206 #endif
207 
208   std::unique_ptr<ChildIterator> child_iterator_;
209   std::string channel_name_;
210   DeviceType device_type_;
211   const int32_t device_id_;
212   const bool send_epoch_end_;
213   bool stop_send_;
214   bool send_finished_;
215   int32_t total_batch_;
216   bool create_data_info_queue_;
217   std::unique_ptr<DATA_INFO_QUEUE> data_info_queue_ptr_;
218   std::atomic<bool> first_fetch_flag_;
219   std::mutex data_info_mutex_;
220   bool first_push_flag_;  // default: false, when first push, it will be true
221   bool dynamic_shape_{false};
222   std::deque<double> memory_per_batch_;
223   std::shared_ptr<device::DataQueue> ascend_data_queue_;
224 
225 #ifdef ENABLE_DUMP_IR
226   std::shared_ptr<MDChannelInfo> md_channel_info_;
227 #endif
228 
229   // Whether to enable the cache prefetch multilevel pipeline, used in ps cache mode.
230   bool enable_prefetch_cache_pipeline_{false};
231 };
232 }  // namespace dataset
233 }  // namespace mindspore
234 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATASETOPS_DATA_QUEUE_OP_H_
235