• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_
18 
19 #include <atomic>
20 #include <ostream>
21 #include <sstream>
22 #include <string>
23 
24 #include <nlohmann/json.hpp>
25 
26 #include "minddata/dataset/include/dataset/constants.h"
27 #include "minddata/dataset/util/path.h"
28 #include "minddata/dataset/util/status.h"
29 
30 // Config settings for the client-side
31 // example config file:
32 // {
33 //    "numParallelWorkers": 3
34 // }
35 //
36 
37 namespace mindspore {
38 namespace dataset {
39 // The ConfigManager is a class for managing default values.  When a user is constructing any objects
40 // in the framework, often they may choose to omit some settings instead of overriding them.
41 // This class manages some of the default values, for cases when the user does not manually specify
42 // those values.
43 class ConfigManager {
44  public:
45   ConfigManager();
46 
47   // destructor
48   ~ConfigManager() = default;
49 
50   // A print method typically used for debugging
51   // @param out - The output stream to write output to
52   void Print(std::ostream &out) const;
53 
54   // << Stream output operator overload
55   // @notes This allows you to write the debug print info using stream operators
56   // @param out - reference to the output stream being overloaded
57   // @param cS - reference to the ConfigManager to display
58   // @return - the output stream must be returned
59   friend std::ostream &operator<<(std::ostream &out, const ConfigManager &cS) {
60     cS.Print(out);
61     return out;
62   }
63 
64   // Another debug print helper.  Converts the print info to a string for you.
65   // @return The string version of the debug print
ToString()66   std::string ToString() {
67     std::stringstream ss;
68     ss << *this;
69     return ss.str();
70   }
71 
72   // Loads a json file with the default settings and populates all the settings
73   // @param settingsFile - A json file with a set of default settings
74   // @return Status error code
75   Status LoadFile(const std::string &settingsFile);
76 
77   // getter function
78   // @return The number of workers setting
num_parallel_workers()79   int32_t num_parallel_workers() const { return num_parallel_workers_; }
80 
81   // getter function
82   // @return The queue size of the operator's output connector
op_connector_size()83   int32_t op_connector_size() const { return op_connector_size_; }
84 
85   // getter function
86   // @return The sending batches that will send to device
sending_batches()87   int64_t sending_batches() const { return sending_batches_; }
88 
89   // getter function
90   // @return The internal worker-to-master connector queue size
worker_connector_size()91   int32_t worker_connector_size() const { return worker_connector_size_; }
92 
num_cpu_threads()93   int32_t num_cpu_threads() const { return num_cpu_threads_; }
94 
95   // getter function
96   // @return The hostname of cache server
cache_host()97   std::string cache_host() const { return cache_host_; }
98 
99   // getter function
100   // @return The port of cache server
cache_port()101   int32_t cache_port() const { return cache_port_; }
102 
103   /// getter function
104   /// \return Number of tcp/ip connection
num_connections()105   int32_t num_connections() const { return num_connections_; }
106 
107   /// getter function
108   /// \return Prefetch size
prefetch_size()109   int32_t prefetch_size() const { return prefetch_size_; }
110 
111   /// getter function
112   /// \return auto_num_workers_
auto_num_workers()113   bool auto_num_workers() const { return auto_num_workers_; }
114 
115   // setter function
116   // @param num_parallel_workers - The setting to apply to the config
117   // @return Status error code
118   Status set_num_parallel_workers(int32_t num_parallel_workers);
119 
120   // setter function
121   // @param connector_size - The setting to apply to the config
122   void set_worker_connector_size(int32_t connector_size);
123 
124   // setter function
125   // @param connector_size - The setting to apply to the config
126   void set_op_connector_size(int32_t connector_size);
127 
128   // setter function
129   // @param sending_batches - The setting to apply to the config
130   void set_sending_batches(int64_t sending_batches);
131 
132   // setter function
133   // @param cache_host - The hostname of cache server
134   void set_cache_host(std::string cache_host);
135 
136   // setter function
137   // @param cache_port - The port of cache server
138   void set_cache_port(int32_t cache_port);
139 
140   /// setter function
141   /// \param num_connections
142   void set_num_connections(int32_t num_connections);
143 
144   /// setter function
145   /// \param prefetch_size
146   void set_prefetch_size(int32_t prefetch_size);
147 
148   /// setter function
149   /// \param numa_switch
150   void set_numa_enable(bool numa_enable);
151 
152   /// getter function
153   /// Now we want to separate the numa link to _c_dataengine in the CMakeLists,
154   /// so we want user to choose whether to open numa switch.
155   /// @return Get the current numa switch state.
numa_enable()156   bool numa_enable() const { return numa_enable_; }
157 
158   // getter function
159   // This rank_id is for numa and device_queue, one process work with only one rank_id
160   // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
161   // but for distribute scenario, this rank_id come from _get_global_rank() in python
162   // @return Get the current device id, for one process, it's only with one rank_id.
rank_id()163   int32_t rank_id() const { return rank_id_; }
164 
165   // setter function
166   // @param rank_id - Set the current device id
167   void set_rank_id(int32_t rank_id);
168 
169   uint32_t seed() const;
170 
171   // setter function
172   // @param seed - The default seed to use
173   void set_seed(uint32_t seed);
174 
175   // setter function
176   // @param interval - The setting to apply to the config
177   void set_monitor_sampling_interval(uint32_t interval);
178 
179   // getter function
180   // @return The interval of monitor sampling
monitor_sampling_interval()181   int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; }
182 
183   // setter function
184   // @param stop_profiler - The setting to apply to the config
185   void stop_dataset_profiler(bool stop_profiler);
186 
187   // getter function
188   // @return The status of stop profiler
stop_profiler_status()189   bool stop_profiler_status() const { return stop_profiler_; }
190 
191   // setter function
192   // @param file_ready - The setting to apply to the config
193   void set_profiler_file_status(bool file_ready);
194 
195   // getter function
196   // @return The status of profiler file, whether generated
get_profiler_file_status()197   bool get_profiler_file_status() const { return file_ready_; }
198 
199   // setter function
200   // @param auto_num_workers - whether assign threads to each op automatically
set_auto_num_workers(bool auto_num_workers)201   void set_auto_num_workers(bool auto_num_workers) { auto_num_workers_ = auto_num_workers; }
202 
203   // setter function
204   // this function will be called when a distributed sampler (RT and Obj) is created and will be used by AutoWorkerPass
205   // This is to get around the limitation of PreBuildSampler (which doesn't have a getter for sharding params)
206   // @param num_shards
set_num_shards_for_auto_num_workers(int32_t num_shards)207   void set_num_shards_for_auto_num_workers(int32_t num_shards) { auto_num_workers_num_shards_ = num_shards; }
208 
209   // getter function, will be called by AutoNumWorker, user discretion above AutoNumWorker is advised
210   // @param num_shards_
get_num_shards_for_auto_num_workers()211   int32_t get_num_shards_for_auto_num_workers() const { return auto_num_workers_num_shards_; }
212 
213   // setter function
214   // @param timeout - The setting to apply to the config
215   void set_callback_timeout(uint32_t timeout);
216 
217   // getter function
218   // @return The timeout DSWaitedCallback would wait for before raising an error
callback_timeout()219   int32_t callback_timeout() const { return callback_timout_; }
220 
221   // getter function
222   // E.g. 0 would corresponds to a 1:1:1 ratio of num_worker among leaf batch and map.
223   // please refer to AutoWorkerPass for detail on what each option is.
224   // @return The experimental config used by AutoNumWorker, each 1 refers to a different setup configuration
get_auto_worker_config()225   uint8_t get_auto_worker_config() { return auto_worker_config_; }
226 
227   // setter function
228   // E.g. set the value of 0 would corresponds to a 1:1:1 ratio of num_worker among leaf batch and map.
229   // please refer to AutoWorkerPass for detail on what each option is.
230   // @return The experimental config used by AutoNumWorker, each 1 refers to a different setup configuration
set_auto_worker_config_(uint8_t cfg)231   void set_auto_worker_config_(uint8_t cfg) { auto_worker_config_ = cfg; }
232 
233   // setter function
234   // @param enable - To enable multiprocessing to use shared memory
set_enable_shared_mem(bool enable)235   void set_enable_shared_mem(bool enable) { enable_shared_mem_ = enable; }
236 
237   // getter function
238   // @return - Flag to indicate whether shared memory for multi-processing is enabled
enable_shared_mem()239   bool enable_shared_mem() { return enable_shared_mem_; }
240 
241  private:
242   int32_t num_parallel_workers_;
243   int32_t worker_connector_size_;
244   int32_t op_connector_size_;
245   int64_t sending_batches_;
246   // This rank_id is for numa and device_queue, one process work with only one rank_id,
247   // for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',
248   // but for distribute scenario, this rank_id come from _get_global_rank() in python
249   int32_t rank_id_;
250   uint32_t seed_;
251   uint32_t monitor_sampling_interval_;
252   std::atomic_bool stop_profiler_;
253   std::atomic_bool file_ready_;
254   uint32_t callback_timout_;
255   std::string cache_host_;
256   int32_t cache_port_;
257   int32_t num_connections_;
258   bool numa_enable_;
259   int32_t prefetch_size_;
260   bool auto_num_workers_;
261   int32_t num_cpu_threads_;
262   int32_t auto_num_workers_num_shards_;
263   uint8_t auto_worker_config_;
264   bool enable_shared_mem_;
265   // Private helper function that takes a nlohmann json format and populates the settings
266   // @param j - The json nlohmann json info
267   Status FromJson(const nlohmann::json &j);
268 };
269 }  // namespace dataset
270 }  // namespace mindspore
271 
272 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_CORE_CONFIG_MANAGER_H_
273