1 /** 2 * Copyright 2019-2023 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_ 18 19 #include <atomic> 20 #include <ostream> 21 #include <sstream> 22 #include <string> 23 24 #include <nlohmann/json.hpp> 25 26 #include "minddata/dataset/include/dataset/constants.h" 27 #include "minddata/dataset/util/path.h" 28 #include "minddata/dataset/util/status.h" 29 30 // Config settings for the client-side 31 // example config file: 32 // { 33 // "numParallelWorkers": 3 34 // } 35 // 36 37 namespace mindspore { 38 namespace dataset { 39 const char kEmptyString[] = ""; 40 const char kJsonExtension[] = ".json"; 41 42 // The ConfigManager is a class for managing default values. When a user is constructing any objects 43 // in the framework, often they may choose to omit some settings instead of overriding them. 44 // This class manages some of the default values, for cases when the user does not manually specify 45 // those values. 46 class ConfigManager { 47 public: 48 ConfigManager(); 49 50 // destructor 51 ~ConfigManager() = default; 52 53 // A print method typically used for debugging 54 // @param out - The output stream to write output to 55 void Print(std::ostream &out) const; 56 57 // << Stream output operator overload 58 // @notes This allows you to write the debug print info using stream operators 59 // @param out - reference to the output stream being overloaded 60 // @param cS - reference to the ConfigManager to display 61 // @return - the output stream must be returned 62 friend std::ostream &operator<<(std::ostream &out, const ConfigManager &cS) { 63 cS.Print(out); 64 return out; 65 } 66 67 // Another debug print helper. Converts the print info to a string for you. 68 // @return The string version of the debug print ToString()69 std::string ToString() const { 70 std::stringstream ss; 71 ss << *this; 72 return ss.str(); 73 } 74 75 // Loads a json file with the default settings and populates all the settings 76 // @param settingsFile - A json file with a set of default settings 77 // @return Status error code 78 Status LoadFile(const std::string &settingsFile); 79 80 // getter function 81 // @return The number of workers setting num_parallel_workers()82 int32_t num_parallel_workers() const { return num_parallel_workers_; } 83 84 // getter function 85 // @return The queue size of the operator's output connector op_connector_size()86 int32_t op_connector_size() const { return op_connector_size_; } 87 88 // getter function 89 // @return The sending batches that will send to device sending_batches()90 int64_t sending_batches() const { return sending_batches_; } 91 92 // getter function 93 // @return The internal worker-to-master connector queue size worker_connector_size()94 int32_t worker_connector_size() const { return worker_connector_size_; } 95 num_cpu_threads()96 int32_t num_cpu_threads() const { return num_cpu_threads_; } 97 98 // getter function 99 // @return The hostname of cache server cache_host()100 std::string cache_host() const { return cache_host_; } 101 102 // getter function 103 // @return The port of cache server cache_port()104 int32_t cache_port() const { return cache_port_; } 105 106 /// getter function 107 /// \return Number of tcp/ip connection num_connections()108 int32_t num_connections() const { return num_connections_; } 109 110 /// getter function 111 /// \return Prefetch size cache_prefetch_size()112 int32_t cache_prefetch_size() const { return cache_prefetch_size_; } 113 114 /// getter function 115 /// \return auto_num_workers_ auto_num_workers()116 bool auto_num_workers() const { return auto_num_workers_; } 117 118 // setter function 119 // @param num_parallel_workers - The setting to apply to the config 120 // @return Status error code 121 Status set_num_parallel_workers(int32_t num_parallel_workers); 122 123 // setter function 124 // @param connector_size - The setting to apply to the config 125 void set_worker_connector_size(int32_t connector_size); 126 127 // setter function 128 // @param connector_size - The setting to apply to the config 129 void set_op_connector_size(int32_t connector_size); 130 131 // setter function 132 // @param sending_batches - The setting to apply to the config 133 void set_sending_batches(int64_t sending_batches); 134 135 // setter function 136 // @param cache_host - The hostname of cache server 137 void set_cache_host(std::string cache_host); 138 139 // setter function 140 // @param cache_port - The port of cache server 141 void set_cache_port(int32_t cache_port); 142 143 /// setter function 144 /// \param num_connections 145 void set_num_connections(int32_t num_connections); 146 147 /// setter function 148 /// \param cache_prefetch_size 149 void set_cache_prefetch_size(int32_t cache_prefetch_size); 150 151 /// setter function 152 /// \param numa_switch 153 void set_numa_enable(bool numa_enable); 154 155 /// getter function 156 /// Now we want to separate the numa link to _c_dataengine in the CMakeLists, 157 /// so we want user to choose whether to open numa switch. 158 /// @return Get the current numa switch state. numa_enable()159 bool numa_enable() const { return numa_enable_; } 160 161 // getter function 162 // This rank_id is for numa and device_queue, one process work with only one rank_id 163 // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', 164 // but for distribute scenario, this rank_id come from _get_global_rank() in python 165 // @return Get the current device id, for one process, it's only with one rank_id. rank_id()166 int32_t rank_id() const { return rank_id_; } 167 168 // setter function 169 // @param rank_id - Set the current device id 170 void set_rank_id(int32_t rank_id); 171 172 uint32_t seed() const; 173 174 // setter function 175 // @param seed - The default seed to use 176 void set_seed(uint32_t seed); 177 178 // setter function 179 // @param interval - The setting to apply to the config 180 void set_monitor_sampling_interval(uint32_t interval); 181 182 // getter function 183 // @return The interval of monitor sampling monitor_sampling_interval()184 uint32_t monitor_sampling_interval() const { return monitor_sampling_interval_; } 185 186 // setter function 187 // @param auto_num_workers - whether assign threads to each op automatically set_auto_num_workers(bool auto_num_workers)188 void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; } 189 190 // setter function 191 // this function will be called when a distributed sampler (RT and Obj) is created and will be used by AutoWorkerPass 192 // This is to get around the limitation of PreBuildSampler (which doesn't have a getter for sharding params) 193 // @param num_shards set_num_shards_for_auto_num_workers(int32_t num_shards)194 void set_num_shards_for_auto_num_workers(int32_t num_shards) { auto_num_workers_num_shards_ = num_shards; } 195 196 // getter function, will be called by AutoNumWorker, user discretion above AutoNumWorker is advised 197 // @param num_shards_ get_num_shards_for_auto_num_workers()198 int32_t get_num_shards_for_auto_num_workers() const { return auto_num_workers_num_shards_; } 199 200 // setter function 201 // @param timeout - The setting to apply to the config 202 void set_callback_timeout(uint32_t timeout); 203 204 // getter function 205 // @return The timeout DSWaitedCallback would wait for before raising an error callback_timeout()206 uint32_t callback_timeout() const { return callback_timout_; } 207 208 // getter function 209 // E.g. 0 would corresponds to a 1:1:1 ratio of num_worker among leaf batch and map. 210 // please refer to AutoWorkerPass for detail on what each option is. 211 // @return The experimental config used by AutoNumWorker, each 1 refers to a different setup configuration get_auto_worker_config()212 uint8_t get_auto_worker_config() const { return auto_worker_config_; } 213 214 // setter function 215 // E.g. set the value of 0 would corresponds to a 1:1:1 ratio of num_worker among leaf batch and map. 216 // please refer to AutoWorkerPass for detail on what each option is. 217 // @return The experimental config used by AutoNumWorker, each 1 refers to a different setup configuration set_auto_worker_config_(uint8_t cfg)218 void set_auto_worker_config_(uint8_t cfg) { auto_worker_config_ = cfg; } 219 220 // setter function 221 // @param enable - To enable multiprocessing to use shared memory set_enable_shared_mem(bool enable)222 void set_enable_shared_mem(bool enable) { enable_shared_mem_ = enable; } 223 224 // getter function 225 // @return - Flag to indicate whether shared memory for multi-processing is enabled enable_shared_mem()226 bool enable_shared_mem() const { return enable_shared_mem_; } 227 228 // setter function 229 // @param offload - To enable automatic offloading of dataset ops set_auto_offload(bool offload)230 void set_auto_offload(bool offload) { auto_offload_ = offload; } 231 232 // getter function 233 // @return - Flag to indicate whether automatic offloading is enabled for the dataset get_auto_offload()234 bool get_auto_offload() const { return auto_offload_; } 235 236 // setter function 237 // @param enable - To enable autotune 238 // @param bool save_autoconfig - True if should save AutoTune data pipeline configuration 239 // @param json_filepath - JSON filepath where the final AutoTune data pipeline will be generated 240 // @return Status error code 241 Status set_enable_autotune(bool enable, bool save_autoconfig, const std::string &json_filepath); 242 243 // getter function 244 // @return - Flag to indicate whether autotune is enabled enable_autotune()245 bool enable_autotune() const { return enable_autotune_; } 246 247 // getter function 248 // @return - Flag to indicate whether to save AutoTune configuration save_autoconfig()249 bool save_autoconfig() const { return save_autoconfig_; } 250 251 // getter function 252 // @return - The final AutoTune configuration JSON filepath get_autotune_json_filepath()253 std::string get_autotune_json_filepath() { return autotune_json_filepath_; } 254 255 // getter function 256 // @return - autotune interval in steps autotune_interval()257 int64_t autotune_interval() const { return autotune_interval_; } 258 259 // setter function 260 // @param interval - autotune interval in steps set_autotune_interval(int64_t interval)261 void set_autotune_interval(int64_t interval) { autotune_interval_ = interval; } 262 263 // setter function 264 // @param enable - To enable watchdog python thread set_enable_watchdog(bool enable)265 void set_enable_watchdog(bool enable) { enable_watchdog_ = enable; } 266 267 // getter function 268 // @return - Flag to indicate whether watchdog python thread is enabled enable_watchdog()269 bool enable_watchdog() const { return enable_watchdog_; } 270 271 // getter function 272 // @return - multiprocessing timeout interval in seconds multiprocessing_timeout_interval()273 uint32_t multiprocessing_timeout_interval() const { return multiprocessing_timeout_interval_; } 274 275 // setter function 276 // @param interval - multiprocessing timeout interval in seconds set_multiprocessing_timeout_interval(uint32_t interval)277 void set_multiprocessing_timeout_interval(uint32_t interval) { multiprocessing_timeout_interval_ = interval; } 278 279 // setter function 280 // @param is_dynamic - Indicate whether the dataset is dynamic-shape set_dynamic_shape(bool is_dynamic)281 void set_dynamic_shape(bool is_dynamic) { dynamic_shape_ = is_dynamic; } 282 283 // getter function 284 // @return - Flag to indicate whether the dataset is dynamic-shape dynamic_shape()285 bool dynamic_shape() const { return dynamic_shape_; } 286 287 // setter function 288 // @notes User must also set the seed to be able to get same augmentations 289 // @notes Fast recovery can cause slightly different random augmentations than original run 290 // (System default = true) 291 // @param fast_recovery - Set whether MD pipeline recovers fast in failover reset set_fast_recovery(const bool fast_recovery)292 void set_fast_recovery(const bool fast_recovery) { fast_recovery_ = fast_recovery; } 293 294 // getter function 295 // @return - Flag to indicate whether md pipeline recovers fast in failover reset fast_recovery()296 bool fast_recovery() const { return fast_recovery_; } 297 298 // setter function 299 // @param debug_mode_flag - Set whether debug mode is on. When enabled, the dataset pipeline runs synchronously and 300 // sequentially. set_debug_mode(const bool debug_mode_flag)301 void set_debug_mode(const bool debug_mode_flag) { debug_mode_flag_ = debug_mode_flag; } 302 303 // getter function 304 // @return - Flag to indicate whether the debug mode is on get_debug_mode()305 bool get_debug_mode() const { return debug_mode_flag_; } 306 307 // setter function 308 // @param error_samples_mode - Set the method in which erroneous samples should be processed 309 // (System default = ErrorSamplesMode::kReturn) 310 // @notes For replacement of erroneous samples, MD will select a deterministic but "random" sample. set_error_samples_mode(const ErrorSamplesMode error_samples_mode)311 void set_error_samples_mode(const ErrorSamplesMode error_samples_mode) { error_samples_mode_ = error_samples_mode; } 312 313 // getter function 314 // @return - The method in which erroneous samples should be processed in a dataset pipeline 315 // @notes This method is used for external configuration API which returns integer type get_error_samples_mode()316 int32_t get_error_samples_mode() const { return static_cast<int>(error_samples_mode_); } 317 318 // getter function 319 // @return - The method in which erroneous samples should be processed in a dataset pipeline 320 // @notes This method is used for internal processing, using enum type error_samples_mode()321 ErrorSamplesMode error_samples_mode() const { return error_samples_mode_; } 322 323 private: 324 // Private helper function that takes a nlohmann json format and populates the settings 325 // @param j - The json nlohmann json info 326 Status FromJson(const nlohmann::json &j); 327 328 int32_t num_parallel_workers_; 329 int32_t worker_connector_size_; 330 int32_t op_connector_size_; 331 int64_t sending_batches_; 332 // This rank_id is for numa and device_queue, one process work with only one rank_id, 333 // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES', 334 // but for distribute scenario, this rank_id come from _get_global_rank() in python 335 int32_t rank_id_; 336 uint32_t seed_; 337 uint32_t monitor_sampling_interval_; 338 uint32_t callback_timout_; 339 std::string cache_host_; 340 int32_t cache_port_; 341 int32_t num_connections_; 342 bool numa_enable_; 343 int32_t cache_prefetch_size_; 344 bool auto_num_workers_; 345 int32_t num_cpu_threads_; 346 int32_t auto_num_workers_num_shards_; 347 uint8_t auto_worker_config_; 348 bool enable_shared_mem_; 349 bool auto_offload_; 350 bool enable_autotune_; 351 bool save_autoconfig_; // True if should save AutoTune configuration 352 int64_t autotune_interval_; 353 bool enable_watchdog_; // Watchdog python thread enabled flag 354 uint32_t multiprocessing_timeout_interval_; // Multiprocessing timeout interval in seconds 355 std::string autotune_json_filepath_; // Filepath name of the final AutoTune Configuration JSON file 356 bool dynamic_shape_{false}; 357 bool fast_recovery_{true}; // Used for failover scenario to recover quickly or produce same augmentations 358 bool debug_mode_flag_{false}; // Indicator for debug mode 359 ErrorSamplesMode error_samples_mode_{ErrorSamplesMode::kReturn}; // The method to process erroneous samples 360 }; 361 } // namespace dataset 362 } // namespace mindspore 363 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_ 364