• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15"""Dataset help for minddata dataset"""
16import math
17import os
18
19from mindspore._checkparam import Validator
20from mindspore.common.dtype import pytype_to_dtype
21from .. import context, nn
22from ._utils import _exec_datagraph, _get_types_and_shapes, _construct_tensor_list
23from ..parallel._utils import _get_device_num, _get_global_rank, _need_to_full, _to_full_shapes, _get_pipeline_stages
24from ..ops import operations as P
25
26
27def _send_data(dataset, epoch_num):
28    """Engine dataset to write data to tdt queue."""
29    if not hasattr(dataset, '__has_sent__'):
30        exec_dataset = dataset.__transfer_dataset__
31        exec_dataset.send(epoch_num)
32        dataset.__has_sent__ = True
33
34
35def _send_data_no_flag(dataset, epoch_num):
36    """Engine dataset to write data to tdt queue directly."""
37    exec_dataset = dataset.__transfer_dataset__
38    exec_dataset.send(epoch_num)
39
40
41def _dynamic_sink_scenario(dataset, dataset_iter):
42    """Special scenario with dynamic shape and sink_size=1."""
43    flag = False
44    ms_role = os.getenv("MS_ROLE")
45    if hasattr(dataset_iter, "sink_size") and \
46       dataset_iter.sink_size == 1 and \
47       dataset.get_dataset_size() != 1 and \
48       hasattr(dataset_iter, "sink_count") and \
49       dataset_iter.sink_count == 1 and \
50       context.get_context("device_target") == "Ascend" and \
51       context.get_context("mode") == context.GRAPH_MODE and \
52       ms_role != "MS_WORKER":
53        flag = True
54    return flag
55
56
57class _DataWrapper(nn.Cell):
58    """
59    Wraps the input network with a dataset which automatically fetches data with 'GetNext' function from the
60    dataset channel 'queue_name' and performs the forward computation.
61    """
62
63    def __init__(self, network, dataset_types, dataset_shapes, queue_name, min_shapes=None, max_shapes=None):
64        super(_DataWrapper, self).__init__(auto_prefix=False, flags=network.get_flags())
65        # Also copy the flag in `network` construct
66        flags = getattr(network.__class__.construct, "_mindspore_flags", {})
67        self.info = (dataset_types, dataset_shapes)
68        self.add_flags(**flags)
69        self.get_next = P.GetNext(dataset_types, dataset_shapes, len(dataset_types), queue_name)
70        if min_shapes is not None and max_shapes is not None:
71            Validator.check_value_type("min_shapes", min_shapes, [list, tuple])
72            Validator.check_value_type("max_shapes", max_shapes, [list, tuple])
73            self.get_next.add_prim_attr("min_shapes", min_shapes)
74            self.get_next.add_prim_attr("max_shapes", max_shapes)
75        self.network = network
76
77    def construct(self):
78        outputs = self.get_next()
79        return self.network(*outputs)
80
81
82def _generate_dataset_sink_mode_net(network, dataset_shapes, dataset_types, queue_name,
83                                    min_shapes=None, max_shapes=None):
84    if not isinstance(network, _DataWrapper):
85        network = _DataWrapper(network, dataset_types, dataset_shapes, queue_name, min_shapes, max_shapes)
86    return network
87
88
89def has_dynamic_shape(dataset_shapes):
90    for shape in dataset_shapes:
91        if -1 in shape:
92            return True
93    return False
94
95
96def _generate_network_with_dataset(network, dataset_helper, queue_name):
97    dataset_types, dataset_shapes = dataset_helper.types_shapes()
98    (min_shapes, max_shapes) = (None, None) if not has_dynamic_shape(dataset_shapes) \
99        else dataset_helper.dynamic_min_max_shapes()
100    network = _generate_dataset_sink_mode_net(network, dataset_shapes, dataset_types,
101                                              queue_name, min_shapes, max_shapes)
102    return network
103
104
105def connect_network_with_dataset(network, dataset_helper):
106    """
107    Connect the `network` with dataset in `dataset_helper`.
108
109    This function wraps the input network with 'GetNext' so that the data can be fetched automatically from the
110    data channel corresponding to the 'queue_name' and passed to the input network during forward computation.
111
112    Note:
113        In the case of running the network on Ascend/GPU in graph mode, this function will wrap the input network with
114        'GetNext', in other cases, the input network will be returned with no change.
115        The 'GetNext' is required to get data only in sink mode, so this function is not applicable to no-sink mode.
116
117    Args:
118        network (Cell): The training network for dataset.
119        dataset_helper (DatasetHelper): A class to process the MindData dataset, it provides the type, shape and queue
120            name of the dataset to wrap the `GetNext`.
121
122    Returns:
123        Cell, a new network wrapped with 'GetNext' in the case of running the task on Ascend in graph mode, otherwise
124        it is the input network.
125
126    Supported Platforms:
127        ``Ascend`` ``GPU``
128
129    Examples:
130        >>> from mindspore import DatasetHelper
131        >>>
132        >>> # call create_dataset function to create a regular dataset, refer to mindspore.dataset
133        >>> train_dataset = create_custom_dataset()
134        >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=True)
135        >>> net = Net()
136        >>> net_with_get_next = connect_network_with_dataset(net, dataset_helper)
137    """
138
139    dataset_iter = dataset_helper.iter
140    dataset = dataset_iter.dataset
141
142    if isinstance(dataset_iter, _DatasetIterNormal):
143        raise RuntimeError("The API 'connect_network_with_dataset' should be called in dataset sink mode.")
144
145    ms_role = os.getenv("MS_ROLE")
146    if ms_role in ("MS_PSERVER", "MS_SCHED"):
147        return network
148
149    queue_name = dataset.__transfer_dataset__.queue_name
150    if _dynamic_sink_scenario(dataset, dataset_iter):
151        if not hasattr(dataset_iter, '__network__'):
152            dataset_iter.__network__ = network
153        network = dataset_iter.__network__
154
155        dataset_types, dataset_shapes = dataset_helper.get_data_info()
156        dataset_types = [pytype_to_dtype(x) for x in dataset_types]
157
158        key = str(dataset_types) + str(dataset_shapes)
159        if hasattr(dataset_iter, '__network_manage__') and key in dataset_iter.__network_manage__:
160            network = dataset_iter.__network_manage__[key]
161        else:
162            if _need_to_full():
163                device_num = _get_device_num() // _get_pipeline_stages()
164                dataset_shapes = _to_full_shapes(dataset_shapes, device_num)
165
166            network = _generate_dataset_sink_mode_net(network, dataset_shapes, dataset_types, queue_name)
167            dataset_iter.__network_manage__ = dataset_iter.__network_manage__ if hasattr(
168                dataset_iter, '__network_manage__') else dict()
169            dataset_iter.__network_manage__[key] = network
170        return network
171
172    if not hasattr(dataset, '__me_inited__') and \
173       not context.get_context("enable_ge") and \
174       context.get_context("device_target") in ("Ascend", "GPU"):
175        dataset.__me_inited__ = True
176        network = _generate_network_with_dataset(network, dataset_helper, queue_name)
177
178    if hasattr(dataset_iter, "sink_size") and \
179       dataset_iter.sink_size == 1 and \
180       dataset.get_dataset_size() != 1 and \
181       hasattr(dataset_iter, "sink_count") and \
182       dataset_iter.sink_count == 1 and \
183       context.get_context("device_target") == "Ascend" and \
184       context.get_context("mode") == context.PYNATIVE_MODE:
185        dataset_helper.get_data_info()
186
187    return network
188
189
190class DatasetHelper:
191    """
192    DatasetHelper is a class to process the MindData dataset and provides the information of dataset.
193
194    According to different contexts, change the iterations of dataset and use the same iteration for loop in different
195    contexts.
196
197    Note:
198        The iteration of DatasetHelper will provide one epoch data.
199
200    Args:
201        dataset (Dataset): The dataset iterator. The dataset can be generated by dataset generator API in
202                           :class:`mindspore.dataset`, such as :class:`mindspore.dataset.ImageFolderDataset`.
203        dataset_sink_mode (bool): If the value is true, GetNext is employed to fetch the data, otherwise the data is fed
204                                  from host. Default: True.
205        sink_size (int): Control the amount of data in each sink.
206                          If sink_size=-1, sink the complete dataset for each epoch.
207                          If sink_size>0, sink sink_size data for each epoch.
208                          Default: -1.
209        epoch_num (int): The number of passes of the entire dataset to be sent. Default: 1.
210
211    Examples:
212        >>> from mindspore import DatasetHelper
213        >>>
214        >>> train_dataset = create_custom_dataset()
215        >>> set_helper = DatasetHelper(train_dataset, dataset_sink_mode=False)
216        >>>
217        >>> net = Net()
218        >>> # Object of DatasetHelper is iterable
219        >>> for next_element in set_helper:
220        ...     # `next_element` includes data and label, using data to run the net
221        ...     data = next_element[0]
222        ...     net(data)
223    """
224
225    def __init__(self, dataset, dataset_sink_mode=True, sink_size=-1, epoch_num=1):
226        dataset_sink_mode = Validator.check_bool(dataset_sink_mode)
227        Validator.check_is_int(sink_size)
228        if sink_size < -1 or sink_size == 0:
229            raise ValueError("The 'sink_size' must be -1 or positive, but got sink_size {}.".format(sink_size))
230        if sink_size == -1:
231            sink_size = dataset.get_dataset_size()
232
233        if dataset_sink_mode:
234            if context.get_context("enable_ge"):
235                iterclass = _DatasetIterGE
236            else:
237                if context.get_context("mode") == context.GRAPH_MODE:
238                    ms_role = os.getenv("MS_ROLE")
239                    if ms_role in ("MS_PSERVER", "MS_SCHED"):
240                        iterclass = _DatasetIterPSServer
241                    elif ms_role == "MS_WORKER":
242                        iterclass = _DatasetIterPSWork
243                    elif (context.get_context("device_target") == "Ascend") or \
244                         (context.get_context("device_target") == "GPU"):
245                        iterclass = _DatasetIterMSLoopSink
246                    elif context.get_context("device_target") == "CPU":
247                        raise RuntimeError("Currently dataset sink mode is not supported when the device "
248                                           "target is CPU, please set dataset sink mode to False.")
249                else:
250                    iterclass = _DatasetIterPyNative
251            self.iter = iterclass(dataset, sink_size, epoch_num)
252        else:
253            iterclass = _DatasetIterNormal
254            self.iter = iterclass(dataset, epoch_num=epoch_num)
255
256    def __iter__(self):
257        return self.iter.__iter__()
258
259    # A temp solution for loop sink. Delete later
260    def types_shapes(self):
261        """
262        Get the types and shapes from dataset on the current configuration.
263
264        Examples:
265            >>> from mindspore import DatasetHelper
266            >>>
267            >>> train_dataset = create_custom_dataset()
268            >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=False)
269            >>>
270            >>> types, shapes = dataset_helper.types_shapes()
271        """
272        return self.iter.types_shapes()
273
274    def sink_size(self):
275        """
276        Get sink_size for each iteration.
277
278        Examples:
279            >>> from mindspore import DatasetHelper
280            >>>
281            >>> train_dataset = create_custom_dataset()
282            >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=True, sink_size=-1)
283            >>>
284            >>> # if sink_size==-1, then will return the full size of source dataset.
285            >>> sink_size = dataset_helper.sink_size()
286        """
287        return self.iter.get_sink_size()
288
289    def stop_send(self):
290        """Stop send data about data sink."""
291        self.iter.stop_send()
292
293    def release(self):
294        """Free up resources about data sink."""
295        self.iter.release()
296
297    def continue_send(self):
298        """Continue to send data to device at the beginning of epoch."""
299        self.iter.continue_send()
300
301    def get_data_info(self):
302        """
303        In sink mode, it returns the types and shapes of the current data.
304        Generally, it works in dynamic shape scenarios.
305
306        Examples:
307            >>> from mindspore import DatasetHelper
308            >>>
309            >>> train_dataset = create_custom_dataset()
310            >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=True)
311            >>>
312            >>> types, shapes = dataset_helper.get_data_info()
313        """
314        return self.iter.get_data_info()
315
316    def dynamic_min_max_shapes(self):
317        """
318        Return the minimum and maximum data length of dynamic source dataset.
319
320        Examples:
321            >>> from mindspore import DatasetHelper
322            >>>
323            >>> train_dataset = create_custom_dataset()
324            >>> # config dynamic shape
325            >>> dataset.set_dynamic_columns(columns={"data1": [16, None, 83], "data2": [None]})
326            >>> dataset_helper = DatasetHelper(train_dataset, dataset_sink_mode=True)
327            >>>
328            >>> min_shapes, max_shapes = dataset_helper.dynamic_min_max_shapes()
329        """
330        return self.iter.dynamic_min_max_shapes()
331
332
333class _DatasetIter:
334    """Base iter for dataset helper"""
335
336    def __init__(self, dataset, sink_size, epoch_num):
337        self.dataset = dataset
338        self.sink_size = sink_size
339        self.sink_count = self.get_sink_count(dataset)
340
341        if not hasattr(dataset, '__transfer_dataset__'):
342            if hasattr(dataset, '__loop_size__'):
343                ms_role = os.getenv("MS_ROLE")
344                # PS mode does not support loop sink and need get the real sink size.
345                if ms_role != "MS_WORKER":
346                    self.sink_size = dataset.__loop_size__
347            create_data_info_queue = (sink_size == 1 and self.sink_count == 1 and context.get_context(
348                "device_target") == "Ascend")
349            dataset.__transfer_dataset__ = _exec_datagraph(dataset, self.sink_size,
350                                                           create_data_info_queue=create_data_info_queue)
351
352            if not hasattr(dataset, '__no_send__'):
353                _send_data(dataset, epoch_num)
354        else:
355            _send_data_no_flag(dataset, epoch_num)
356
357        self.stop_send = dataset.__transfer_dataset__.stop_send
358        self.release = dataset.__transfer_dataset__.release
359        self.continue_send = dataset.__transfer_dataset__.continue_send
360        self.get_data_info = dataset.__transfer_dataset__.get_data_info
361        self.dynamic_min_max_shapes = dataset.dynamic_min_max_shapes
362        self.dataset_types, self.dataset_shapes = _get_types_and_shapes(dataset)
363
364    def __iter__(self):
365        self.index = 0
366        return self
367
368    def __next__(self):
369        if self.index >= self.sink_count:
370            raise StopIteration()
371        self.index += 1
372        return self.op()
373
374    def types_shapes(self):
375        return self.dataset_types, self.dataset_shapes
376
377    def get_sink_count(self, dataset):
378        sink_count = 1
379        if hasattr(dataset, '__loop_size__'):
380            loop_size = dataset.__loop_size__
381            if loop_size <= dataset.get_dataset_size() and dataset.get_dataset_size() % loop_size != 0:
382                raise ValueError(f"Dataset size {dataset.get_dataset_size()} and 'sink_size' {loop_size} "
383                                 f"are not matched, dataset size should be divisible by 'sink_size'.")
384            sink_count = math.ceil(dataset.get_dataset_size() / loop_size)
385        return sink_count
386
387    def get_sink_size(self):
388        """get sink_size to device"""
389        sink_size = 1
390        ms_role = os.getenv("MS_ROLE")
391        if hasattr(self.dataset, '__loop_size__'):
392            sink_size = self.dataset.__loop_size__
393        elif ms_role == "MS_WORKER":
394            # PS mode does not support loop sink.
395            sink_size = 1
396        else:
397            if context.get_context("enable_ge") or context.get_context("device_target") == "Ascend" \
398                    or context.get_context("device_target") == "GPU":
399                if self.sink_size > 0:
400                    sink_size = self.sink_size
401                else:
402                    sink_size = self.dataset.get_dataset_size()
403        return sink_size
404
405
406class _DatasetIterGE(_DatasetIter):
407    """Iter for GE."""
408
409    def __init__(self, dataset, sink_size, epoch_num):
410        super().__init__(dataset, sink_size, epoch_num)
411        self.sink_count = self.get_sink_count(dataset)
412        batch_expand_num = 1
413        if _need_to_full():
414            batch_expand_num = _get_device_num() // _get_pipeline_stages()
415        tensor_list_run = _construct_tensor_list(self.dataset_types, self.dataset_shapes, batch_expand_num)
416
417        def op():
418            return tensor_list_run
419
420        self.op = op
421
422
423class _DatasetIterPyNative(_DatasetIter):
424    """Iter for context (mode=PYNATIVE_MODE)."""
425
426    def __init__(self, dataset, sink_size, epoch_num):
427        super().__init__(dataset, sink_size, epoch_num)
428        if sink_size > 0:
429            self.sink_count = sink_size
430        else:
431            self.sink_count = dataset.get_dataset_size()
432
433        def op():
434            return tuple()
435
436        self.op = op
437
438
439class _DatasetIterMSLoopSink(_DatasetIter):
440    """Iter for context (device_target=Ascend)"""
441
442    def __init__(self, dataset, sink_size, epoch_num):
443        super().__init__(dataset, sink_size, epoch_num)
444        self.sink_count = self.get_sink_count(dataset)
445        # for self._parallel_mode equal to semi_auto_parallel or auto_parallel, and not using full_batch,
446        # use a complete tensor to compile, and slice tensor to run. The batch dimension of tensors for
447        # compile is device_number times the batch dimension of tensors for run. Now only support LoopSink.
448        if _need_to_full():
449            device_num = _get_device_num() // _get_pipeline_stages()
450            self.dataset_shapes = _to_full_shapes(self.dataset_shapes, device_num)
451
452        def op():
453            return tuple()
454
455        self.op = op
456
457
458class _DatasetIterPSServer(_DatasetIter):
459    """Iter for context on MS_PSERVER or MS_SCHED"""
460
461    def __init__(self, dataset, sink_size, epoch_num):
462        super().__init__(dataset, sink_size, epoch_num)
463        self.sink_count = 1
464        self.sink_size = 1
465        self.op = None
466
467        def op():
468            return _construct_tensor_list(self.dataset_types, self.dataset_shapes, batch_expand_num=1)
469
470        self.op = op
471
472
473class _DatasetIterPSWork(_DatasetIter):
474    """Iter for context on MS_WORKER"""
475
476    def __init__(self, dataset, sink_size, epoch_num):
477        super().__init__(dataset, sink_size, epoch_num)
478        if sink_size > 0:
479            self.sink_count = sink_size
480        else:
481            self.sink_count = dataset.get_dataset_size()
482
483        def op():
484            return tuple()
485
486        self.op = op
487
488
489class _DatasetIterNormal:
490    """Iter for normal(non sink) mode, feed the data from host."""
491
492    def __init__(self, dataset, epoch_num=-1):
493        self.dataset = dataset
494        self.device_num = _get_device_num()
495        self.global_rank = _get_global_rank()
496        self.iter = self.dataset.create_tuple_iterator(num_epochs=epoch_num, do_copy=True)
497
498    def __iter__(self):
499        return self
500
501    def __next__(self):
502        data = self.iter.__next__()
503        return data
504
505
506__all__ = ["DatasetHelper", "connect_network_with_dataset"]
507