• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_PERF_AUTO_TUNE_H_
18 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_PERF_AUTO_TUNE_H_
19 
20 #include <map>
21 #include <memory>
22 #include <mutex>
23 #include <string>
24 #include <vector>
25 #include "minddata/dataset/util/status.h"
26 #include "minddata/dataset/util/log_adapter.h"
27 #include "minddata/dataset/engine/execution_tree.h"
28 #include "minddata/dataset/engine/tree_adapter.h"
29 #include "minddata/dataset/engine/tree_modifier.h"
30 #include "minddata/dataset/engine/perf/profiling.h"
31 
32 namespace mindspore {
33 namespace dataset {
34 class TreeModifier;
35 class AutoTune {
36  public:
37   /// AutoTune constructor
38   /// \param tree_adap_  pointer to the tree adapter
39   /// \param profiling_mgr_ pointer to the profiler manager
40   AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr);
41 
42   ~AutoTune() = default;
43 
44   /// Function to create and launch the AutoTune thread.
45   /// \return Status object
46   Status LaunchThread();
47 
48  private:
49   /// Main entry function for AT, triggers loop function.
50   /// \return Status object
51   Status Main();
52 
53   /// Primary AT loop till exit
54   /// \return Status object
55   Status ATMainLoop(bool output_intermediate_config);
56 
57   /// \brief Helper to print the tree configuration
58   void PrintTreeConfiguration() const;
59 
60   /// \brief Helper to print the logs after/post the main loop in AutoTune
61   void PostMainLogging() const;
62 
63   /// \brief Helper to summarize the execution tree
64   /// \param[out] out An output vector of string to store the summary
65   /// \return Status object
66   Status SummarizeTreeConfiguration(std::vector<std::string> *out);
67 
68 #ifndef ENABLE_ANDROID
69   /// \brief Serialize the dataset and save the AT config (workers and queue size) to a json file
70   /// \param file_name Name of the file
71   /// \return Status object
72   Status SaveAutotuneConfig(const std::string &file_name);
73 
74   /// Setter for autotune_config_json_
75   /// \return Status code
76   Status SetAutotuneConfigJson();
77 #endif
78 
79   /// Function to collect info from the tree
80   /// \return Status code
81   Status CollectOpsInfo();
82 
83   /// Function to check for current step and execute logic
84   /// \return status code
85   Status RunIterationStep();
86 
87   /// Function to check for current epoch and execute logic
88   /// \return status code
89   Status RunIterationEpoch();
90 
91   /// The AutoTune logic for pipelines that executes every epoch
92   /// \return status code
93   Status RunIteration();
94 
95   /// Fetches connector size for steps or epoch based on mode
96   /// \return status code
97   Status GetConnectorSize(std::vector<int32_t> *sizes) const;
98 
99   /// Fetches connector capacity for steps or epoch based on mode
100   /// \return status code
101   Status GetConnectorCapacity(std::vector<int32_t> *capacities) const;
102 
103   /// Computes current connector queue util percentage
104   /// \param[out] usage_avg_last double return avg util percentage for connector queue
105   /// \param[out] avg_size double to return avg size (usage) of connector queue
106   /// \param[out] avg_capacity double to return avg capacity for connector queue
107   /// \return status code
108   Status GetConnectorUtil(double *usage_avg_last, double *avg_size, double *avg_capacity);
109 
110   /// Fetches Connector Queue empty frequency for steps or epoch based on mode
111   /// \return status code
112   Status GetEmptyQueueFrequency(float *empty_freq) const;
113 
114   /// Check if the dataset pipeline is the bottleneck
115   /// \param[out] isBottleneck bool
116   /// \return Status code
117   Status IsDSaBottleneck(bool *isBottleneck);
118 
119   /// Returns true if the pipeline is sink or non-sink
120   /// \return bool
121   bool IsSink() const;
122 
123   const int32_t TO_PERCENT = 100;
124   // system specifics
125   int32_t max_workers_;
126   const int32_t MIN_NUM_WORKERS = 1;
127   const int32_t MAX_QUEUE_SIZE = 128;
128   const int32_t MIN_QUEUE_SIZE = 1;
129   // Warmup specifics
130   const int32_t EPOCH_WARMUP = 1;
131   const int64_t STEP_WARMUP = 150;
132   // Worker specifics
133   const int32_t INCREMENT_WORKER = 2;
134   const int32_t DECREMENT_WORKER = -1;
135   // Queue Specifics
136   const float_t INPUT_QUEUE_LOW = 0.5;
137 
138   // Value to maintain checking for device_queue utlization at.
139   const float_t DEVICE_CONNECTOR_UTIL_THRESHOLD = 0.75;
140 
141   const float_t LEAF_QUEUE_THRESHOLD = 0.9;
142   const float_t INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD = 0.35;
143   const int64_t INCREMENT_QUEUE_SIZE = 4;
144   // CPU Specifics
145   const float_t MAP_OP_WORKER_HIGH_THRESHOLD = 75;
146   const float_t MAP_OP_WORKER_LOW_THRESHOLD = 35;
147   // Running mode specifics
148   enum AutoTuneMode { kAutoTuneModeEpoch, kAutoTuneModeStep };
149   enum AutoTunePhase { kAutoTunePhaseTime, kAutoTunePhaseMemory, kAutoTuneEnd };
150   enum AutoTuneMemPhase { kAutoTuneMemInit, kAutoTuneMemSet, kAutotTuneMemCompare };
151   // Early stop specifics
152   const int32_t EARLY_STOP_TRIAL_THRESHOLD_EPOCH = 4;
153   const int32_t EARLY_STOP_TRIAL_THRESHOLD_STEP = 10;
154   // Memory specifics
155   const float MEMORY_COMPARISON_LOWER_BOUND_PERCENT = 0.02;
156   const float QUEUE_REDUCTION_PERCENTAGE_EPOCH = 0.5;
157   const float QUEUE_REDUCTION_PERCENTAGE_STEP = 0.8;
158 
159   /// Get the out connector capacity of the operator
160   /// \param[in] op_id operator id
161   /// \param[out] capacity the capacity of the connector
162   /// \return Status code
163   Status GetOpConnectorCapacity(int32_t op_id, int64_t *capacity);
164 
165   /// Get the CPU usage of each operator in the pipeline
166   /// \param[out] ops_cpu_util map from op_id to cpu utilization
167   /// \return Status code
168   Status GetOpsCpuUtil(std::map<int32_t, double> *ops_cpu_util);
169 
170   /// Get the queue utilization of each operator in the pipeline
171   /// \param[out] ops_queue_util map from op_id to output queue utilization
172   /// \param[out] ops_queue_util map from op_id to input queue utilization
173   /// \note inline ops would report -1 in both input and output queue utilization
174   /// \return Status code
175   Status GetOpsQueueUtil(std::map<int32_t, double> *out_ops_queue_util, std::map<int32_t, double> *in_ops_queue_util);
176 
177   /// Get the number of workers for each operator in the pipeline
178   /// \param[out] ops_num_workers map from op_id to num_workers
179   /// \return Status code
180   Status GetOpsNumWorker(std::map<int32_t, int32_t> *ops_num_workers);
181 
182   /// Check whether an op is an unsupported by AutoTune
183   /// \param op_id ID to check
184   /// \return bool to skip or not
185   bool SkipOpsCheck(int op_id);
186 
187   /// Main AutoTune algorithm
188   /// \return Status code
189   Status AnalyseTime();
190 
191   /// AutoTune memory algorithm
192   /// \return Status code
193   Status AnalyseMemory();
194 
195   /// Send a ChangeRequest to the operator to update the number of workers
196   /// \param op_id operator ID
197   /// \param old_workers Old number of workers for logging purposes
198   /// \param new_workers new number of worker
199   /// \return Status code
200   Status RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t *num_workers_requested);
201 
202   /// Send a ChangeRequest to the operator to update the connector capacity
203   /// \param op_id operator ID
204   /// \param old_workers Old size for logging purposes
205   /// \param new_workers new size
206   /// \return Status code
207   Status RequestConnectorCapacityChange(int32_t op_id, int32_t old_size, int32_t new_size);
208 
209   /// Track the pipeline time of the current epoch into avg_pipeline_times_
210   /// \return Status code
211   Status TrackPipelineTime();
212 
213   /// Utility function to calculate the mean/average of a list of numbers
214   /// \tparam T type of the vector
215   /// \param items vector of T
216   /// \return double the calculated mean
217   template <typename T>
218   double Mean(const std::vector<T> &items) const;
219 
220   /// Get and update current epoch and step counts
221   /// \return Status Code
222   Status UpdateCurrentRunInfo();
223 
224   /// Decide whether warmup period is complete to start AT
225   /// \return the decision for skipping further or not
226   bool WarmupSkipCheck();
227 
228   /// Save current worker and queue size configurations
229   /// \return Status code
230   Status RegisterWorkersQueue();
231 
232   /// Reset values of workers and queue sizes for ops to saved best config
233   /// \return  Status code
234   Status ResetWorkersQueue();
235 
236   /// Compare current and previous metrics for memory performance in memory phase of
237   /// AT tuning. Logic can be changed without modification to primary function
238   /// \param prev_avg previous comparison value - normally pre-change in pipeline
239   /// \param cur_avg current comparison value - normally post-change in pipeline
240   /// \return decision on good (True) or bad (False) change in metric
241   bool MemoryPhaseCompareMetric(double prev_avg, double cur_avg);
242 
243   /// Pointer to the tree adapter to get tree info
244   TreeAdapter *tree_adapter_;
245   /// Pointer to the profiler manager to get statistics
246   ProfilingManager *profiling_manager_;
247   /// Unique_ptr to the tree modifier to handle change requests
248   std::unique_ptr<TreeModifier> tree_modifier_;
249 
250   /// mux to be used to sleep
251   std::mutex mux_;
252   /// Conditional variable used to sleep
253   CondVar cv_;
254 
255   /// a map from op_id to a pointer to the operator
256   std::map<int32_t, std::shared_ptr<DatasetOp>> ops_;
257   /// list of all map_ops
258   std::vector<int32_t> parallel_ops_ids_;
259   /// ID of the leaf op
260   int32_t leaf_op_id_;
261   /// vector of pipeline time per epoch
262   std::vector<double> avg_pipeline_times_;
263 
264   /// the current epoch and step indices (starts from 1)
265   int32_t cur_epoch_running_;
266   int32_t last_epoch_autotuned_;
267   // step based auto-tuning specifics
268   int32_t cur_step_running_;
269   int64_t last_step_autotuned_;
270 
271   int32_t mode_;
272   int64_t step_gap_;
273   bool skip_flag_;
274   int32_t AT_phase_;
275   // tracking whether AT makes a change
276   bool AT_change_;
277 
278   // Phase 1 - Analyse Time
279   double phase_1_best_time_;
280   int32_t phase_1_no_improve_count_;
281   std::vector<int32_t> phase_1_best_workers;
282   std::vector<int32_t> phase_1_best_queue;
283 
284   // phase 2 - Analyse Memory
285   int32_t count_down_;
286   int32_t phase_3_state_;
287   int32_t phase_3_ID_;
288   double avg_batch_time;
289   double phase_3_prev_avg_;
290   std::vector<int32_t> OP_values;
291 
292   /// True if should save AutoTune configuration
293   bool save_autoconfig_;
294 
295   /// Flag to enable saving of intermediate autotune config to disk
296   bool save_intermediate_autoconfig_{false};
297 
298   /// Filepath name of the final AutoTune Configuration JSON file
299   std::string autotune_json_filepath_;
300 
301   /// Serialized json of the optimized ir tree that holds the updated configuration (workers and queue size)
302   nlohmann::json autotune_config_json_;
303 };
304 }  // namespace dataset
305 }  // namespace mindspore
306 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_PERF_AUTO_TUNE_H_
307