1# Copyright 2020 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15"""Dataset help for minddata dataset""" 16import math 17import os 18 19from mindspore._checkparam import Validator 20from mindspore.common.dtype import pytype_to_dtype 21from .. import context, nn 22from ._utils import _exec_datagraph, _get_types_and_shapes, _construct_tensor_list 23from ..parallel._utils import _get_device_num, _get_global_rank, _need_to_full, _to_full_shapes, _get_pipeline_stages 24from ..ops import operations as P 25 26 27def _send_data(dataset, epoch_num): 28 """Engine dataset to write data to tdt queue.""" 29 if not hasattr(dataset, '__has_sent__'): 30 exec_dataset = dataset.__transfer_dataset__ 31 exec_dataset.send(epoch_num) 32 dataset.__has_sent__ = True 33 34 35def _send_data_no_flag(dataset, epoch_num): 36 """Engine dataset to write data to tdt queue directly.""" 37 exec_dataset = dataset.__transfer_dataset__ 38 exec_dataset.send(epoch_num) 39 40 41def _dynamic_sink_scenario(dataset, dataset_iter): 42 """Special scenario with dynamic shape and sink_size=1.""" 43 flag = False 44 ms_role = os.getenv("MS_ROLE") 45 if hasattr(dataset_iter, "sink_size") and \ 46 dataset_iter.sink_size == 1 and \ 47 dataset.get_dataset_size() != 1 and \ 48 hasattr(dataset_iter, "sink_count") and \ 49 dataset_iter.sink_count == 1 and \ 50 context.get_context("device_target") == "Ascend" and \ 51 context.get_context("mode") == context.GRAPH_MODE and \ 52 ms_role != "MS_WORKER": 53 flag = True 54 return flag 55 56 57class _DataWrapper(nn.Cell): 58 """ 59 Wraps the input network with a dataset which automatically fetches data with 'GetNext' function from the 60 dataset channel 'queue_name' and performs the forward computation. 61 """ 62 63 def __init__(self, network, dataset_types, dataset_shapes, queue_name, min_shapes=None, max_shapes=None): 64 super(_DataWrapper, self).__init__(auto_prefix=False, flags=network.get_flags()) 65 # Also copy the flag in `network` construct 66 flags = getattr(network.__class__.construct, "_mindspore_flags", {}) 67 self.info = (dataset_types, dataset_shapes) 68 self.add_flags(**flags) 69 self.get_next = P.GetNext(dataset_types, dataset_shapes, len(dataset_types), queue_name) 70 if min_shapes is not None and max_shapes is not None: 71 Validator.check_value_type("min_shapes", min_shapes, [list, tuple]) 72 Validator.check_value_type("max_shapes", max_shapes, [list, tuple]) 73 self.get_next.add_prim_attr("min_shapes", min_shapes) 74 self.get_next.add_prim_attr("max_shapes", max_shapes) 75 self.network = network 76 77 def construct(self): 78 outputs = self.get_next() 79 return self.network(*outputs) 80 81 82def _generate_dataset_sink_mode_net(network, dataset_shapes, dataset_types, queue_name, 83 min_shapes=None, max_shapes=None): 84 if not isinstance(network, _DataWrapper): 85 network = _DataWrapper(network, dataset_types, dataset_shapes, queue_name, min_shapes, max_shapes) 86 return network 87 88 89def has_dynamic_shape(dataset_shapes): 90 for shape in dataset_shapes: 91 if -1 in shape: 92 return True 93 return False 94 95 96def _generate_network_with_dataset(network, dataset_helper, queue_name): 97 dataset_types, dataset_shapes = dataset_helper.types_shapes() 98 (min_shapes, max_shapes) = (None, None) if not has_dynamic_shape(dataset_shapes) \ 99 else dataset_helper.dynamic_min_max_shapes() 100 network = _generate_dataset_sink_mode_net(network, dataset_shapes, dataset_types, 101 queue_name, min_shapes, max_shapes) 102 return network 103 104 105def connect_network_with_dataset(network, dataset_helper): 106 """ 107 Connect the `network` with dataset in `dataset_helper`. 108 109 This function wraps the input network with 'GetNext' so that the data can be fetched automatically from the 110 data channel corresponding to the 'queue_name' and passed to the input network during forward computation. 111 112 Note: 113 In the case of running the network on Ascend/GPU in graph mode, this function will wrap the input network with 114 'GetNext', in other cases, the input network will be returned with no change. 115 The 'GetNext' is required to get data only in sink mode, so this function is not applicable to no-sink mode. 116 117 Args: 118 network (Cell): The training network for dataset. 119 dataset_helper (DatasetHelper): A class to process the MindData dataset, it provides the type, shape and queue 120 name of the dataset to wrap the `GetNext`. 121 122 Returns: 123 Cell, a new network wrapped with 'GetNext' in the case of running the task on Ascend in graph mode, otherwise 124 it is the input network. 125 126 Supported Platforms: 127 ``Ascend`` ``GPU`` 128 129 Examples: 130 >>> from mindspore import DatasetHelper 131 >>> 132 >>> # call create_dataset function to create a regular dataset, refer to mindspore.dataset 133 >>> train_dataset = create_custom_dataset() 134 >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=True) 135 >>> net = Net() 136 >>> net_with_get_next = connect_network_with_dataset(net, dataset_helper) 137 """ 138 139 dataset_iter = dataset_helper.iter 140 dataset = dataset_iter.dataset 141 142 if isinstance(dataset_iter, _DatasetIterNormal): 143 raise RuntimeError("The API 'connect_network_with_dataset' should be called in dataset sink mode.") 144 145 ms_role = os.getenv("MS_ROLE") 146 if ms_role in ("MS_PSERVER", "MS_SCHED"): 147 return network 148 149 queue_name = dataset.__transfer_dataset__.queue_name 150 if _dynamic_sink_scenario(dataset, dataset_iter): 151 if not hasattr(dataset_iter, '__network__'): 152 dataset_iter.__network__ = network 153 network = dataset_iter.__network__ 154 155 dataset_types, dataset_shapes = dataset_helper.get_data_info() 156 dataset_types = [pytype_to_dtype(x) for x in dataset_types] 157 158 key = str(dataset_types) + str(dataset_shapes) 159 if hasattr(dataset_iter, '__network_manage__') and key in dataset_iter.__network_manage__: 160 network = dataset_iter.__network_manage__[key] 161 else: 162 if _need_to_full(): 163 device_num = _get_device_num() // _get_pipeline_stages() 164 dataset_shapes = _to_full_shapes(dataset_shapes, device_num) 165 166 network = _generate_dataset_sink_mode_net(network, dataset_shapes, dataset_types, queue_name) 167 dataset_iter.__network_manage__ = dataset_iter.__network_manage__ if hasattr( 168 dataset_iter, '__network_manage__') else dict() 169 dataset_iter.__network_manage__[key] = network 170 return network 171 172 if not hasattr(dataset, '__me_inited__') and \ 173 not context.get_context("enable_ge") and \ 174 context.get_context("device_target") in ("Ascend", "GPU"): 175 dataset.__me_inited__ = True 176 network = _generate_network_with_dataset(network, dataset_helper, queue_name) 177 178 if hasattr(dataset_iter, "sink_size") and \ 179 dataset_iter.sink_size == 1 and \ 180 dataset.get_dataset_size() != 1 and \ 181 hasattr(dataset_iter, "sink_count") and \ 182 dataset_iter.sink_count == 1 and \ 183 context.get_context("device_target") == "Ascend" and \ 184 context.get_context("mode") == context.PYNATIVE_MODE: 185 dataset_helper.get_data_info() 186 187 return network 188 189 190class DatasetHelper: 191 """ 192 DatasetHelper is a class to process the MindData dataset and provides the information of dataset. 193 194 According to different contexts, change the iterations of dataset and use the same iteration for loop in different 195 contexts. 196 197 Note: 198 The iteration of DatasetHelper will provide one epoch data. 199 200 Args: 201 dataset (Dataset): The dataset iterator. The dataset can be generated by dataset generator API in 202 :class:`mindspore.dataset`, such as :class:`mindspore.dataset.ImageFolderDataset`. 203 dataset_sink_mode (bool): If the value is true, GetNext is employed to fetch the data, otherwise the data is fed 204 from host. Default: True. 205 sink_size (int): Control the amount of data in each sink. 206 If sink_size=-1, sink the complete dataset for each epoch. 207 If sink_size>0, sink sink_size data for each epoch. 208 Default: -1. 209 epoch_num (int): The number of passes of the entire dataset to be sent. Default: 1. 210 211 Examples: 212 >>> from mindspore import DatasetHelper 213 >>> 214 >>> train_dataset = create_custom_dataset() 215 >>> set_helper = DatasetHelper(train_dataset, dataset_sink_mode=False) 216 >>> 217 >>> net = Net() 218 >>> # Object of DatasetHelper is iterable 219 >>> for next_element in set_helper: 220 ... # `next_element` includes data and label, using data to run the net 221 ... data = next_element[0] 222 ... net(data) 223 """ 224 225 def __init__(self, dataset, dataset_sink_mode=True, sink_size=-1, epoch_num=1): 226 dataset_sink_mode = Validator.check_bool(dataset_sink_mode) 227 Validator.check_is_int(sink_size) 228 if sink_size < -1 or sink_size == 0: 229 raise ValueError("The 'sink_size' must be -1 or positive, but got sink_size {}.".format(sink_size)) 230 if sink_size == -1: 231 sink_size = dataset.get_dataset_size() 232 233 if dataset_sink_mode: 234 if context.get_context("enable_ge"): 235 iterclass = _DatasetIterGE 236 else: 237 if context.get_context("mode") == context.GRAPH_MODE: 238 ms_role = os.getenv("MS_ROLE") 239 if ms_role in ("MS_PSERVER", "MS_SCHED"): 240 iterclass = _DatasetIterPSServer 241 elif ms_role == "MS_WORKER": 242 iterclass = _DatasetIterPSWork 243 elif (context.get_context("device_target") == "Ascend") or \ 244 (context.get_context("device_target") == "GPU"): 245 iterclass = _DatasetIterMSLoopSink 246 elif context.get_context("device_target") == "CPU": 247 raise RuntimeError("Currently dataset sink mode is not supported when the device " 248 "target is CPU, please set dataset sink mode to False.") 249 else: 250 iterclass = _DatasetIterPyNative 251 self.iter = iterclass(dataset, sink_size, epoch_num) 252 else: 253 iterclass = _DatasetIterNormal 254 self.iter = iterclass(dataset, epoch_num=epoch_num) 255 256 def __iter__(self): 257 return self.iter.__iter__() 258 259 # A temp solution for loop sink. Delete later 260 def types_shapes(self): 261 """ 262 Get the types and shapes from dataset on the current configuration. 263 264 Examples: 265 >>> from mindspore import DatasetHelper 266 >>> 267 >>> train_dataset = create_custom_dataset() 268 >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=False) 269 >>> 270 >>> types, shapes = dataset_helper.types_shapes() 271 """ 272 return self.iter.types_shapes() 273 274 def sink_size(self): 275 """ 276 Get sink_size for each iteration. 277 278 Examples: 279 >>> from mindspore import DatasetHelper 280 >>> 281 >>> train_dataset = create_custom_dataset() 282 >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=True, sink_size=-1) 283 >>> 284 >>> # if sink_size==-1, then will return the full size of source dataset. 285 >>> sink_size = dataset_helper.sink_size() 286 """ 287 return self.iter.get_sink_size() 288 289 def stop_send(self): 290 """Stop send data about data sink.""" 291 self.iter.stop_send() 292 293 def release(self): 294 """Free up resources about data sink.""" 295 self.iter.release() 296 297 def continue_send(self): 298 """Continue to send data to device at the beginning of epoch.""" 299 self.iter.continue_send() 300 301 def get_data_info(self): 302 """ 303 In sink mode, it returns the types and shapes of the current data. 304 Generally, it works in dynamic shape scenarios. 305 306 Examples: 307 >>> from mindspore import DatasetHelper 308 >>> 309 >>> train_dataset = create_custom_dataset() 310 >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=True) 311 >>> 312 >>> types, shapes = dataset_helper.get_data_info() 313 """ 314 return self.iter.get_data_info() 315 316 def dynamic_min_max_shapes(self): 317 """ 318 Return the minimum and maximum data length of dynamic source dataset. 319 320 Examples: 321 >>> from mindspore import DatasetHelper 322 >>> 323 >>> train_dataset = create_custom_dataset() 324 >>> # config dynamic shape 325 >>> dataset.set_dynamic_columns(columns={"data1": [16, None, 83], "data2": [None]}) 326 >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=True) 327 >>> 328 >>> min_shapes, max_shapes = dataset_helper.dynamic_min_max_shapes() 329 """ 330 return self.iter.dynamic_min_max_shapes() 331 332 333class _DatasetIter: 334 """Base iter for dataset helper""" 335 336 def __init__(self, dataset, sink_size, epoch_num): 337 self.dataset = dataset 338 self.sink_size = sink_size 339 self.sink_count = self.get_sink_count(dataset) 340 341 if not hasattr(dataset, '__transfer_dataset__'): 342 if hasattr(dataset, '__loop_size__'): 343 ms_role = os.getenv("MS_ROLE") 344 # PS mode does not support loop sink and need get the real sink size. 345 if ms_role != "MS_WORKER": 346 self.sink_size = dataset.__loop_size__ 347 create_data_info_queue = (sink_size == 1 and self.sink_count == 1 and context.get_context( 348 "device_target") == "Ascend") 349 dataset.__transfer_dataset__ = _exec_datagraph(dataset, self.sink_size, 350 create_data_info_queue=create_data_info_queue) 351 352 if not hasattr(dataset, '__no_send__'): 353 _send_data(dataset, epoch_num) 354 else: 355 _send_data_no_flag(dataset, epoch_num) 356 357 self.stop_send = dataset.__transfer_dataset__.stop_send 358 self.release = dataset.__transfer_dataset__.release 359 self.continue_send = dataset.__transfer_dataset__.continue_send 360 self.get_data_info = dataset.__transfer_dataset__.get_data_info 361 self.dynamic_min_max_shapes = dataset.dynamic_min_max_shapes 362 self.dataset_types, self.dataset_shapes = _get_types_and_shapes(dataset) 363 364 def __iter__(self): 365 self.index = 0 366 return self 367 368 def __next__(self): 369 if self.index >= self.sink_count: 370 raise StopIteration() 371 self.index += 1 372 return self.op() 373 374 def types_shapes(self): 375 return self.dataset_types, self.dataset_shapes 376 377 def get_sink_count(self, dataset): 378 sink_count = 1 379 if hasattr(dataset, '__loop_size__'): 380 loop_size = dataset.__loop_size__ 381 if loop_size <= dataset.get_dataset_size() and dataset.get_dataset_size() % loop_size != 0: 382 raise ValueError(f"Dataset size {dataset.get_dataset_size()} and 'sink_size' {loop_size} " 383 f"are not matched, dataset size should be divisible by 'sink_size'.") 384 sink_count = math.ceil(dataset.get_dataset_size() / loop_size) 385 return sink_count 386 387 def get_sink_size(self): 388 """get sink_size to device""" 389 sink_size = 1 390 ms_role = os.getenv("MS_ROLE") 391 if hasattr(self.dataset, '__loop_size__'): 392 sink_size = self.dataset.__loop_size__ 393 elif ms_role == "MS_WORKER": 394 # PS mode does not support loop sink. 395 sink_size = 1 396 else: 397 if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend" \ 398 or context.get_context("device_target") == "GPU": 399 if self.sink_size > 0: 400 sink_size = self.sink_size 401 else: 402 sink_size = self.dataset.get_dataset_size() 403 return sink_size 404 405 406class _DatasetIterGE(_DatasetIter): 407 """Iter for GE.""" 408 409 def __init__(self, dataset, sink_size, epoch_num): 410 super().__init__(dataset, sink_size, epoch_num) 411 self.sink_count = self.get_sink_count(dataset) 412 batch_expand_num = 1 413 if _need_to_full(): 414 batch_expand_num = _get_device_num() // _get_pipeline_stages() 415 tensor_list_run = _construct_tensor_list(self.dataset_types, self.dataset_shapes, batch_expand_num) 416 417 def op(): 418 return tensor_list_run 419 420 self.op = op 421 422 423class _DatasetIterPyNative(_DatasetIter): 424 """Iter for context (mode=PYNATIVE_MODE).""" 425 426 def __init__(self, dataset, sink_size, epoch_num): 427 super().__init__(dataset, sink_size, epoch_num) 428 if sink_size > 0: 429 self.sink_count = sink_size 430 else: 431 self.sink_count = dataset.get_dataset_size() 432 433 def op(): 434 return tuple() 435 436 self.op = op 437 438 439class _DatasetIterMSLoopSink(_DatasetIter): 440 """Iter for context (device_target=Ascend)""" 441 442 def __init__(self, dataset, sink_size, epoch_num): 443 super().__init__(dataset, sink_size, epoch_num) 444 self.sink_count = self.get_sink_count(dataset) 445 # for self._parallel_mode equal to semi_auto_parallel or auto_parallel, and not using full_batch, 446 # use a complete tensor to compile, and slice tensor to run. The batch dimension of tensors for 447 # compile is device_number times the batch dimension of tensors for run. Now only support LoopSink. 448 if _need_to_full(): 449 device_num = _get_device_num() // _get_pipeline_stages() 450 self.dataset_shapes = _to_full_shapes(self.dataset_shapes, device_num) 451 452 def op(): 453 return tuple() 454 455 self.op = op 456 457 458class _DatasetIterPSServer(_DatasetIter): 459 """Iter for context on MS_PSERVER or MS_SCHED""" 460 461 def __init__(self, dataset, sink_size, epoch_num): 462 super().__init__(dataset, sink_size, epoch_num) 463 self.sink_count = 1 464 self.sink_size = 1 465 self.op = None 466 467 def op(): 468 return _construct_tensor_list(self.dataset_types, self.dataset_shapes, batch_expand_num=1) 469 470 self.op = op 471 472 473class _DatasetIterPSWork(_DatasetIter): 474 """Iter for context on MS_WORKER""" 475 476 def __init__(self, dataset, sink_size, epoch_num): 477 super().__init__(dataset, sink_size, epoch_num) 478 if sink_size > 0: 479 self.sink_count = sink_size 480 else: 481 self.sink_count = dataset.get_dataset_size() 482 483 def op(): 484 return tuple() 485 486 self.op = op 487 488 489class _DatasetIterNormal: 490 """Iter for normal(non sink) mode, feed the data from host.""" 491 492 def __init__(self, dataset, epoch_num=-1): 493 self.dataset = dataset 494 self.device_num = _get_device_num() 495 self.global_rank = _get_global_rank() 496 self.iter = self.dataset.create_tuple_iterator(num_epochs=epoch_num, do_copy=True) 497 498 def __iter__(self): 499 return self 500 501 def __next__(self): 502 data = self.iter.__next__() 503 return data 504 505 506__all__ = ["DatasetHelper", "connect_network_with_dataset"] 507