1 /** 2 * Copyright 2021-2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_PERF_AUTO_TUNE_H_ 18 #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_PERF_AUTO_TUNE_H_ 19 20 #include <map> 21 #include <memory> 22 #include <mutex> 23 #include <string> 24 #include <vector> 25 #include "minddata/dataset/util/status.h" 26 #include "minddata/dataset/util/log_adapter.h" 27 #include "minddata/dataset/engine/execution_tree.h" 28 #include "minddata/dataset/engine/tree_adapter.h" 29 #include "minddata/dataset/engine/tree_modifier.h" 30 #include "minddata/dataset/engine/perf/profiling.h" 31 32 namespace mindspore { 33 namespace dataset { 34 class TreeModifier; 35 class AutoTune { 36 public: 37 /// AutoTune constructor 38 /// \param tree_adap_ pointer to the tree adapter 39 /// \param profiling_mgr_ pointer to the profiler manager 40 AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr); 41 42 ~AutoTune() = default; 43 44 /// Function to create and launch the AutoTune thread. 45 /// \return Status object 46 Status LaunchThread(); 47 48 private: 49 /// Main entry function for AT, triggers loop function. 50 /// \return Status object 51 Status Main(); 52 53 /// Primary AT loop till exit 54 /// \return Status object 55 Status ATMainLoop(bool output_intermediate_config); 56 57 /// \brief Helper to print the tree configuration 58 void PrintTreeConfiguration() const; 59 60 /// \brief Helper to print the logs after/post the main loop in AutoTune 61 void PostMainLogging() const; 62 63 /// \brief Helper to summarize the execution tree 64 /// \param[out] out An output vector of string to store the summary 65 /// \return Status object 66 Status SummarizeTreeConfiguration(std::vector<std::string> *out); 67 68 #ifndef ENABLE_ANDROID 69 /// \brief Serialize the dataset and save the AT config (workers and queue size) to a json file 70 /// \param file_name Name of the file 71 /// \return Status object 72 Status SaveAutotuneConfig(const std::string &file_name); 73 74 /// Setter for autotune_config_json_ 75 /// \return Status code 76 Status SetAutotuneConfigJson(); 77 #endif 78 79 /// Function to collect info from the tree 80 /// \return Status code 81 Status CollectOpsInfo(); 82 83 /// Function to check for current step and execute logic 84 /// \return status code 85 Status RunIterationStep(); 86 87 /// Function to check for current epoch and execute logic 88 /// \return status code 89 Status RunIterationEpoch(); 90 91 /// The AutoTune logic for pipelines that executes every epoch 92 /// \return status code 93 Status RunIteration(); 94 95 /// Fetches connector size for steps or epoch based on mode 96 /// \return status code 97 Status GetConnectorSize(std::vector<int32_t> *sizes) const; 98 99 /// Fetches connector capacity for steps or epoch based on mode 100 /// \return status code 101 Status GetConnectorCapacity(std::vector<int32_t> *capacities) const; 102 103 /// Computes current connector queue util percentage 104 /// \param[out] usage_avg_last double return avg util percentage for connector queue 105 /// \param[out] avg_size double to return avg size (usage) of connector queue 106 /// \param[out] avg_capacity double to return avg capacity for connector queue 107 /// \return status code 108 Status GetConnectorUtil(double *usage_avg_last, double *avg_size, double *avg_capacity); 109 110 /// Fetches Connector Queue empty frequency for steps or epoch based on mode 111 /// \return status code 112 Status GetEmptyQueueFrequency(float *empty_freq) const; 113 114 /// Check if the dataset pipeline is the bottleneck 115 /// \param[out] isBottleneck bool 116 /// \return Status code 117 Status IsDSaBottleneck(bool *isBottleneck); 118 119 /// Returns true if the pipeline is sink or non-sink 120 /// \return bool 121 bool IsSink() const; 122 123 const int32_t TO_PERCENT = 100; 124 // system specifics 125 int32_t max_workers_; 126 const int32_t MIN_NUM_WORKERS = 1; 127 const int32_t MAX_QUEUE_SIZE = 128; 128 const int32_t MIN_QUEUE_SIZE = 1; 129 // Warmup specifics 130 const int32_t EPOCH_WARMUP = 1; 131 const int64_t STEP_WARMUP = 150; 132 // Worker specifics 133 const int32_t INCREMENT_WORKER = 2; 134 const int32_t DECREMENT_WORKER = -1; 135 // Queue Specifics 136 const float_t INPUT_QUEUE_LOW = 0.5; 137 138 // Value to maintain checking for device_queue utlization at. 139 const float_t DEVICE_CONNECTOR_UTIL_THRESHOLD = 0.75; 140 141 const float_t LEAF_QUEUE_THRESHOLD = 0.9; 142 const float_t INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD = 0.35; 143 const int64_t INCREMENT_QUEUE_SIZE = 4; 144 // CPU Specifics 145 const float_t MAP_OP_WORKER_HIGH_THRESHOLD = 75; 146 const float_t MAP_OP_WORKER_LOW_THRESHOLD = 35; 147 // Running mode specifics 148 enum AutoTuneMode { kAutoTuneModeEpoch, kAutoTuneModeStep }; 149 enum AutoTunePhase { kAutoTunePhaseTime, kAutoTunePhaseMemory, kAutoTuneEnd }; 150 enum AutoTuneMemPhase { kAutoTuneMemInit, kAutoTuneMemSet, kAutotTuneMemCompare }; 151 // Early stop specifics 152 const int32_t EARLY_STOP_TRIAL_THRESHOLD_EPOCH = 4; 153 const int32_t EARLY_STOP_TRIAL_THRESHOLD_STEP = 10; 154 // Memory specifics 155 const float MEMORY_COMPARISON_LOWER_BOUND_PERCENT = 0.02; 156 const float QUEUE_REDUCTION_PERCENTAGE_EPOCH = 0.5; 157 const float QUEUE_REDUCTION_PERCENTAGE_STEP = 0.8; 158 159 /// Get the out connector capacity of the operator 160 /// \param[in] op_id operator id 161 /// \param[out] capacity the capacity of the connector 162 /// \return Status code 163 Status GetOpConnectorCapacity(int32_t op_id, int64_t *capacity); 164 165 /// Get the CPU usage of each operator in the pipeline 166 /// \param[out] ops_cpu_util map from op_id to cpu utilization 167 /// \return Status code 168 Status GetOpsCpuUtil(std::map<int32_t, double> *ops_cpu_util); 169 170 /// Get the queue utilization of each operator in the pipeline 171 /// \param[out] ops_queue_util map from op_id to output queue utilization 172 /// \param[out] ops_queue_util map from op_id to input queue utilization 173 /// \note inline ops would report -1 in both input and output queue utilization 174 /// \return Status code 175 Status GetOpsQueueUtil(std::map<int32_t, double> *out_ops_queue_util, std::map<int32_t, double> *in_ops_queue_util); 176 177 /// Get the number of workers for each operator in the pipeline 178 /// \param[out] ops_num_workers map from op_id to num_workers 179 /// \return Status code 180 Status GetOpsNumWorker(std::map<int32_t, int32_t> *ops_num_workers); 181 182 /// Check whether an op is an unsupported by AutoTune 183 /// \param op_id ID to check 184 /// \return bool to skip or not 185 bool SkipOpsCheck(int op_id); 186 187 /// Main AutoTune algorithm 188 /// \return Status code 189 Status AnalyseTime(); 190 191 /// AutoTune memory algorithm 192 /// \return Status code 193 Status AnalyseMemory(); 194 195 /// Send a ChangeRequest to the operator to update the number of workers 196 /// \param op_id operator ID 197 /// \param old_workers Old number of workers for logging purposes 198 /// \param new_workers new number of worker 199 /// \return Status code 200 Status RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t *num_workers_requested); 201 202 /// Send a ChangeRequest to the operator to update the connector capacity 203 /// \param op_id operator ID 204 /// \param old_workers Old size for logging purposes 205 /// \param new_workers new size 206 /// \return Status code 207 Status RequestConnectorCapacityChange(int32_t op_id, int32_t old_size, int32_t new_size); 208 209 /// Track the pipeline time of the current epoch into avg_pipeline_times_ 210 /// \return Status code 211 Status TrackPipelineTime(); 212 213 /// Utility function to calculate the mean/average of a list of numbers 214 /// \tparam T type of the vector 215 /// \param items vector of T 216 /// \return double the calculated mean 217 template <typename T> 218 double Mean(const std::vector<T> &items) const; 219 220 /// Get and update current epoch and step counts 221 /// \return Status Code 222 Status UpdateCurrentRunInfo(); 223 224 /// Decide whether warmup period is complete to start AT 225 /// \return the decision for skipping further or not 226 bool WarmupSkipCheck(); 227 228 /// Save current worker and queue size configurations 229 /// \return Status code 230 Status RegisterWorkersQueue(); 231 232 /// Reset values of workers and queue sizes for ops to saved best config 233 /// \return Status code 234 Status ResetWorkersQueue(); 235 236 /// Compare current and previous metrics for memory performance in memory phase of 237 /// AT tuning. Logic can be changed without modification to primary function 238 /// \param prev_avg previous comparison value - normally pre-change in pipeline 239 /// \param cur_avg current comparison value - normally post-change in pipeline 240 /// \return decision on good (True) or bad (False) change in metric 241 bool MemoryPhaseCompareMetric(double prev_avg, double cur_avg); 242 243 /// Pointer to the tree adapter to get tree info 244 TreeAdapter *tree_adapter_; 245 /// Pointer to the profiler manager to get statistics 246 ProfilingManager *profiling_manager_; 247 /// Unique_ptr to the tree modifier to handle change requests 248 std::unique_ptr<TreeModifier> tree_modifier_; 249 250 /// mux to be used to sleep 251 std::mutex mux_; 252 /// Conditional variable used to sleep 253 CondVar cv_; 254 255 /// a map from op_id to a pointer to the operator 256 std::map<int32_t, std::shared_ptr<DatasetOp>> ops_; 257 /// list of all map_ops 258 std::vector<int32_t> parallel_ops_ids_; 259 /// ID of the leaf op 260 int32_t leaf_op_id_; 261 /// vector of pipeline time per epoch 262 std::vector<double> avg_pipeline_times_; 263 264 /// the current epoch and step indices (starts from 1) 265 int32_t cur_epoch_running_; 266 int32_t last_epoch_autotuned_; 267 // step based auto-tuning specifics 268 int32_t cur_step_running_; 269 int64_t last_step_autotuned_; 270 271 int32_t mode_; 272 int64_t step_gap_; 273 bool skip_flag_; 274 int32_t AT_phase_; 275 // tracking whether AT makes a change 276 bool AT_change_; 277 278 // Phase 1 - Analyse Time 279 double phase_1_best_time_; 280 int32_t phase_1_no_improve_count_; 281 std::vector<int32_t> phase_1_best_workers; 282 std::vector<int32_t> phase_1_best_queue; 283 284 // phase 2 - Analyse Memory 285 int32_t count_down_; 286 int32_t phase_3_state_; 287 int32_t phase_3_ID_; 288 double avg_batch_time; 289 double phase_3_prev_avg_; 290 std::vector<int32_t> OP_values; 291 292 /// True if should save AutoTune configuration 293 bool save_autoconfig_; 294 295 /// Flag to enable saving of intermediate autotune config to disk 296 bool save_intermediate_autoconfig_{false}; 297 298 /// Filepath name of the final AutoTune Configuration JSON file 299 std::string autotune_json_filepath_; 300 301 /// Serialized json of the optimized ir tree that holds the updated configuration (workers and queue size) 302 nlohmann::json autotune_config_json_; 303 }; 304 } // namespace dataset 305 } // namespace mindspore 306 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_PERF_AUTO_TUNE_H_ 307