• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_
18 
19 #include <atomic>
20 #include <ostream>
21 #include <sstream>
22 #include <string>
23 
24 #include <nlohmann/json.hpp>
25 
26 #include "minddata/dataset/include/dataset/constants.h"
27 #include "minddata/dataset/util/path.h"
28 #include "minddata/dataset/util/status.h"
29 
30 // Config settings for the client-side
31 // example config file:
32 // {
33 //    "numParallelWorkers": 3
34 // }
35 //
36 
37 namespace mindspore {
38 namespace dataset {
39 const char kEmptyString[] = "";
40 const char kJsonExtension[] = ".json";
41 
42 // The ConfigManager is a class for managing default values.  When a user is constructing any objects
43 // in the framework, often they may choose to omit some settings instead of overriding them.
44 // This class manages some of the default values, for cases when the user does not manually specify
45 // those values.
46 class ConfigManager {
47  public:
48   ConfigManager();
49 
50   // destructor
51   ~ConfigManager() = default;
52 
53   // A print method typically used for debugging
54   // @param out - The output stream to write output to
55   void Print(std::ostream &out) const;
56 
57   // << Stream output operator overload
58   // @notes This allows you to write the debug print info using stream operators
59   // @param out - reference to the output stream being overloaded
60   // @param cS - reference to the ConfigManager to display
61   // @return - the output stream must be returned
62   friend std::ostream &operator<<(std::ostream &out, const ConfigManager &cS) {
63     cS.Print(out);
64     return out;
65   }
66 
67   // Another debug print helper.  Converts the print info to a string for you.
68   // @return The string version of the debug print
ToString()69   std::string ToString() const {
70     std::stringstream ss;
71     ss << *this;
72     return ss.str();
73   }
74 
75   // Loads a json file with the default settings and populates all the settings
76   // @param settingsFile - A json file with a set of default settings
77   // @return Status error code
78   Status LoadFile(const std::string &settingsFile);
79 
80   // getter function
81   // @return The number of workers setting
num_parallel_workers()82   int32_t num_parallel_workers() const { return num_parallel_workers_; }
83 
84   // getter function
85   // @return The queue size of the operator's output connector
op_connector_size()86   int32_t op_connector_size() const { return op_connector_size_; }
87 
88   // getter function
89   // @return The sending batches that will send to device
sending_batches()90   int64_t sending_batches() const { return sending_batches_; }
91 
92   // getter function
93   // @return The internal worker-to-master connector queue size
worker_connector_size()94   int32_t worker_connector_size() const { return worker_connector_size_; }
95 
num_cpu_threads()96   int32_t num_cpu_threads() const { return num_cpu_threads_; }
97 
98   // getter function
99   // @return The hostname of cache server
cache_host()100   std::string cache_host() const { return cache_host_; }
101 
102   // getter function
103   // @return The port of cache server
cache_port()104   int32_t cache_port() const { return cache_port_; }
105 
106   /// getter function
107   /// \return Number of tcp/ip connection
num_connections()108   int32_t num_connections() const { return num_connections_; }
109 
110   /// getter function
111   /// \return Prefetch size
cache_prefetch_size()112   int32_t cache_prefetch_size() const { return cache_prefetch_size_; }
113 
114   /// getter function
115   /// \return auto_num_workers_
auto_num_workers()116   bool auto_num_workers() const { return auto_num_workers_; }
117 
118   // setter function
119   // @param num_parallel_workers - The setting to apply to the config
120   // @return Status error code
121   Status set_num_parallel_workers(int32_t num_parallel_workers);
122 
123   // setter function
124   // @param connector_size - The setting to apply to the config
125   void set_worker_connector_size(int32_t connector_size);
126 
127   // setter function
128   // @param connector_size - The setting to apply to the config
129   void set_op_connector_size(int32_t connector_size);
130 
131   // setter function
132   // @param sending_batches - The setting to apply to the config
133   void set_sending_batches(int64_t sending_batches);
134 
135   // setter function
136   // @param cache_host - The hostname of cache server
137   void set_cache_host(std::string cache_host);
138 
139   // setter function
140   // @param cache_port - The port of cache server
141   void set_cache_port(int32_t cache_port);
142 
143   /// setter function
144   /// \param num_connections
145   void set_num_connections(int32_t num_connections);
146 
147   /// setter function
148   /// \param cache_prefetch_size
149   void set_cache_prefetch_size(int32_t cache_prefetch_size);
150 
151   /// setter function
152   /// \param numa_switch
153   void set_numa_enable(bool numa_enable);
154 
155   /// getter function
156   /// Now we want to separate the numa link to _c_dataengine in the CMakeLists,
157   /// so we want user to choose whether to open numa switch.
158   /// @return Get the current numa switch state.
numa_enable()159   bool numa_enable() const { return numa_enable_; }
160 
161   // getter function
162   // This rank_id is for numa and device_queue, one process work with only one rank_id
163   // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
164   // but for distribute scenario, this rank_id come from _get_global_rank() in python
165   // @return Get the current device id, for one process, it's only with one rank_id.
rank_id()166   int32_t rank_id() const { return rank_id_; }
167 
168   // setter function
169   // @param rank_id - Set the current device id
170   void set_rank_id(int32_t rank_id);
171 
172   uint32_t seed() const;
173 
174   // setter function
175   // @param seed - The default seed to use
176   void set_seed(uint32_t seed);
177 
178   // setter function
179   // @param interval - The setting to apply to the config
180   void set_monitor_sampling_interval(uint32_t interval);
181 
182   // getter function
183   // @return The interval of monitor sampling
monitor_sampling_interval()184   uint32_t monitor_sampling_interval() const { return monitor_sampling_interval_; }
185 
186   // setter function
187   // @param auto_num_workers - whether assign threads to each op automatically
set_auto_num_workers(bool auto_num_workers)188   void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; }
189 
190   // setter function
191   // this function will be called when a distributed sampler (RT and Obj) is created and will be used by AutoWorkerPass
192   // This is to get around the limitation of PreBuildSampler (which doesn't have a getter for sharding params)
193   // @param num_shards
set_num_shards_for_auto_num_workers(int32_t num_shards)194   void set_num_shards_for_auto_num_workers(int32_t num_shards) { auto_num_workers_num_shards_ = num_shards; }
195 
196   // getter function, will be called by AutoNumWorker, user discretion above AutoNumWorker is advised
197   // @param num_shards_
get_num_shards_for_auto_num_workers()198   int32_t get_num_shards_for_auto_num_workers() const { return auto_num_workers_num_shards_; }
199 
200   // setter function
201   // @param timeout - The setting to apply to the config
202   void set_callback_timeout(uint32_t timeout);
203 
204   // getter function
205   // @return The timeout DSWaitedCallback would wait for before raising an error
callback_timeout()206   uint32_t callback_timeout() const { return callback_timout_; }
207 
208   // getter function
209   // E.g. 0 would corresponds to a 1:1:1 ratio of num_worker among leaf batch and map.
210   // please refer to AutoWorkerPass for detail on what each option is.
211   // @return The experimental config used by AutoNumWorker, each 1 refers to a different setup configuration
get_auto_worker_config()212   uint8_t get_auto_worker_config() const { return auto_worker_config_; }
213 
214   // setter function
215   // E.g. set the value of 0 would corresponds to a 1:1:1 ratio of num_worker among leaf batch and map.
216   // please refer to AutoWorkerPass for detail on what each option is.
217   // @return The experimental config used by AutoNumWorker, each 1 refers to a different setup configuration
set_auto_worker_config_(uint8_t cfg)218   void set_auto_worker_config_(uint8_t cfg) { auto_worker_config_ = cfg; }
219 
220   // setter function
221   // @param enable - To enable multiprocessing to use shared memory
set_enable_shared_mem(bool enable)222   void set_enable_shared_mem(bool enable) { enable_shared_mem_ = enable; }
223 
224   // getter function
225   // @return - Flag to indicate whether shared memory for multi-processing is enabled
enable_shared_mem()226   bool enable_shared_mem() const { return enable_shared_mem_; }
227 
228   // setter function
229   // @param offload - To enable automatic offloading of dataset ops
set_auto_offload(bool offload)230   void set_auto_offload(bool offload) { auto_offload_ = offload; }
231 
232   // getter function
233   // @return - Flag to indicate whether automatic offloading is enabled for the dataset
get_auto_offload()234   bool get_auto_offload() const { return auto_offload_; }
235 
236   // setter function
237   // @param enable - To enable autotune
238   // @param bool save_autoconfig - True if should save AutoTune data pipeline configuration
239   // @param json_filepath - JSON filepath where the final AutoTune data pipeline will be generated
240   // @return Status error code
241   Status set_enable_autotune(bool enable, bool save_autoconfig, const std::string &json_filepath);
242 
243   // getter function
244   // @return - Flag to indicate whether autotune is enabled
enable_autotune()245   bool enable_autotune() const { return enable_autotune_; }
246 
247   // getter function
248   // @return - Flag to indicate whether to save AutoTune configuration
save_autoconfig()249   bool save_autoconfig() const { return save_autoconfig_; }
250 
251   // getter function
252   // @return - The final AutoTune configuration JSON filepath
get_autotune_json_filepath()253   std::string get_autotune_json_filepath() { return autotune_json_filepath_; }
254 
255   // getter function
256   // @return - autotune interval in steps
autotune_interval()257   int64_t autotune_interval() const { return autotune_interval_; }
258 
259   // setter function
260   // @param interval - autotune interval in steps
set_autotune_interval(int64_t interval)261   void set_autotune_interval(int64_t interval) { autotune_interval_ = interval; }
262 
263   // setter function
264   // @param enable - To enable watchdog python thread
set_enable_watchdog(bool enable)265   void set_enable_watchdog(bool enable) { enable_watchdog_ = enable; }
266 
267   // getter function
268   // @return - Flag to indicate whether watchdog python thread is enabled
enable_watchdog()269   bool enable_watchdog() const { return enable_watchdog_; }
270 
271   // getter function
272   // @return - multiprocessing timeout interval in seconds
multiprocessing_timeout_interval()273   uint32_t multiprocessing_timeout_interval() const { return multiprocessing_timeout_interval_; }
274 
275   // setter function
276   // @param interval - multiprocessing timeout interval in seconds
set_multiprocessing_timeout_interval(uint32_t interval)277   void set_multiprocessing_timeout_interval(uint32_t interval) { multiprocessing_timeout_interval_ = interval; }
278 
279   // setter function
280   // @param is_dynamic - Indicate whether the dataset is dynamic-shape
set_dynamic_shape(bool is_dynamic)281   void set_dynamic_shape(bool is_dynamic) { dynamic_shape_ = is_dynamic; }
282 
283   // getter function
284   // @return - Flag to indicate whether the dataset is dynamic-shape
dynamic_shape()285   bool dynamic_shape() const { return dynamic_shape_; }
286 
287   // setter function
288   // @notes User must also set the seed to be able to get same augmentations
289   // @notes Fast recovery can cause slightly different random augmentations than original run
290   //     (System default = true)
291   // @param fast_recovery - Set whether MD pipeline recovers fast in failover reset
set_fast_recovery(const bool fast_recovery)292   void set_fast_recovery(const bool fast_recovery) { fast_recovery_ = fast_recovery; }
293 
294   // getter function
295   // @return - Flag to indicate whether md pipeline recovers fast in failover reset
fast_recovery()296   bool fast_recovery() const { return fast_recovery_; }
297 
298   // setter function
299   // @param debug_mode_flag - Set whether debug mode is on. When enabled, the dataset pipeline runs synchronously and
300   //    sequentially.
set_debug_mode(const bool debug_mode_flag)301   void set_debug_mode(const bool debug_mode_flag) { debug_mode_flag_ = debug_mode_flag; }
302 
303   // getter function
304   // @return - Flag to indicate whether the debug mode is on
get_debug_mode()305   bool get_debug_mode() const { return debug_mode_flag_; }
306 
307   // setter function
308   // @param error_samples_mode - Set the method in which erroneous samples should be processed
309   //     (System default = ErrorSamplesMode::kReturn)
310   // @notes For replacement of erroneous samples, MD will select a deterministic but "random" sample.
set_error_samples_mode(const ErrorSamplesMode error_samples_mode)311   void set_error_samples_mode(const ErrorSamplesMode error_samples_mode) { error_samples_mode_ = error_samples_mode; }
312 
313   // getter function
314   // @return - The method in which erroneous samples should be processed in a dataset pipeline
315   // @notes This method is used for external configuration API which returns integer type
get_error_samples_mode()316   int32_t get_error_samples_mode() const { return static_cast<int>(error_samples_mode_); }
317 
318   // getter function
319   // @return - The method in which erroneous samples should be processed in a dataset pipeline
320   // @notes This method is used for internal processing, using enum type
error_samples_mode()321   ErrorSamplesMode error_samples_mode() const { return error_samples_mode_; }
322 
323  private:
324   // Private helper function that takes a nlohmann json format and populates the settings
325   // @param j - The json nlohmann json info
326   Status FromJson(const nlohmann::json &j);
327 
328   int32_t num_parallel_workers_;
329   int32_t worker_connector_size_;
330   int32_t op_connector_size_;
331   int64_t sending_batches_;
332   // This rank_id is for numa and device_queue, one process work with only one rank_id,
333   // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
334   // but for distribute scenario, this rank_id come from _get_global_rank() in python
335   int32_t rank_id_;
336   uint32_t seed_;
337   uint32_t monitor_sampling_interval_;
338   uint32_t callback_timout_;
339   std::string cache_host_;
340   int32_t cache_port_;
341   int32_t num_connections_;
342   bool numa_enable_;
343   int32_t cache_prefetch_size_;
344   bool auto_num_workers_;
345   int32_t num_cpu_threads_;
346   int32_t auto_num_workers_num_shards_;
347   uint8_t auto_worker_config_;
348   bool enable_shared_mem_;
349   bool auto_offload_;
350   bool enable_autotune_;
351   bool save_autoconfig_;  // True if should save AutoTune configuration
352   int64_t autotune_interval_;
353   bool enable_watchdog_;                       // Watchdog python thread enabled flag
354   uint32_t multiprocessing_timeout_interval_;  // Multiprocessing timeout interval in seconds
355   std::string autotune_json_filepath_;         // Filepath name of the final AutoTune Configuration JSON file
356   bool dynamic_shape_{false};
357   bool fast_recovery_{true};     // Used for failover scenario to recover quickly or produce same augmentations
358   bool debug_mode_flag_{false};  // Indicator for debug mode
359   ErrorSamplesMode error_samples_mode_{ErrorSamplesMode::kReturn};  // The method to process erroneous samples
360 };
361 }  // namespace dataset
362 }  // namespace mindspore
363 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_
364