• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ============================================================================
15"""
16Create train or eval dataset.
17"""
18import os
19import math
20from enum import Enum
21
22import pandas as pd
23import numpy as np
24import mindspore.dataset as ds
25import mindspore.common.dtype as mstype
26
27from .config import DataConfig
28
29
30class DataType(Enum):
31    """
32    Enumerate supported dataset format.
33    """
34    MINDRECORD = 1
35    TFRECORD = 2
36    H5 = 3
37
38
39class H5Dataset():
40    """
41    Create dataset with H5 format.
42
43    Args:
44        data_path (str): Dataset directory.
45        train_mode (bool): Whether dataset is used for train or eval (default=True).
46        train_num_of_parts (int): The number of train data file (default=21).
47        test_num_of_parts (int): The number of test data file (default=3).
48    """
49    max_length = 39
50
51    def __init__(self, data_path, train_mode=True,
52                 train_num_of_parts=DataConfig.train_num_of_parts,
53                 test_num_of_parts=DataConfig.test_num_of_parts):
54        self._hdf_data_dir = data_path
55        self._is_training = train_mode
56        if self._is_training:
57            self._file_prefix = 'train'
58            self._num_of_parts = train_num_of_parts
59        else:
60            self._file_prefix = 'test'
61            self._num_of_parts = test_num_of_parts
62        self.data_size = self._bin_count(self._hdf_data_dir, self._file_prefix, self._num_of_parts)
63        print("data_size: {}".format(self.data_size))
64
65    def _bin_count(self, hdf_data_dir, file_prefix, num_of_parts):
66        size = 0
67        for part in range(num_of_parts):
68            _y = pd.read_hdf(os.path.join(hdf_data_dir, f'{file_prefix}_output_part_{str(part)}.h5'))
69            size += _y.shape[0]
70        return size
71
72    def _iterate_hdf_files_(self, num_of_parts=None,
73                            shuffle_block=False):
74        """
75        iterate among hdf files(blocks). when the whole data set is finished, the iterator restarts
76            from the beginning, thus the data stream will never stop
77        :param train_mode: True or false,false is eval_mode,
78            this file iterator will go through the train set
79        :param num_of_parts: number of files
80        :param shuffle_block: shuffle block files at every round
81        :return: input_hdf_file_name, output_hdf_file_name, finish_flag
82        """
83        parts = np.arange(num_of_parts)
84        while True:
85            if shuffle_block:
86                for _ in range(int(shuffle_block)):
87                    np.random.shuffle(parts)
88            for i, p in enumerate(parts):
89                yield os.path.join(self._hdf_data_dir, f'{self._file_prefix}_input_part_{str(p)}.h5'), \
90                      os.path.join(self._hdf_data_dir, f'{self._file_prefix}_output_part_{str(p)}.h5'), \
91                      i + 1 == len(parts)
92
93    def _generator(self, X, y, batch_size, shuffle=True):
94        """
95        should be accessed only in private
96        :param X:
97        :param y:
98        :param batch_size:
99        :param shuffle:
100        :return:
101        """
102        number_of_batches = np.ceil(1. * X.shape[0] / batch_size)
103        counter = 0
104        finished = False
105        sample_index = np.arange(X.shape[0])
106        if shuffle:
107            for _ in range(int(shuffle)):
108                np.random.shuffle(sample_index)
109        assert X.shape[0] > 0
110        while True:
111            batch_index = sample_index[batch_size * counter: batch_size * (counter + 1)]
112            X_batch = X[batch_index]
113            y_batch = y[batch_index]
114            counter += 1
115            yield X_batch, y_batch, finished
116            if counter == number_of_batches:
117                counter = 0
118                finished = True
119
120    def batch_generator(self, batch_size=1000,
121                        random_sample=False, shuffle_block=False):
122        """
123        :param train_mode: True or false,false is eval_mode,
124        :param batch_size
125        :param num_of_parts: number of files
126        :param random_sample: if True, will shuffle
127        :param shuffle_block: shuffle file blocks at every round
128        :return:
129        """
130
131        for hdf_in, hdf_out, _ in self._iterate_hdf_files_(self._num_of_parts,
132                                                           shuffle_block):
133            start = stop = None
134            X_all = pd.read_hdf(hdf_in, start=start, stop=stop).values
135            y_all = pd.read_hdf(hdf_out, start=start, stop=stop).values
136            data_gen = self._generator(X_all, y_all, batch_size,
137                                       shuffle=random_sample)
138            finished = False
139
140            while not finished:
141                X, y, finished = data_gen.__next__()
142                X_id = X[:, 0:self.max_length]
143                X_va = X[:, self.max_length:]
144                yield np.array(X_id.astype(dtype=np.int32)), \
145                      np.array(X_va.astype(dtype=np.float32)), \
146                      np.array(y.astype(dtype=np.float32))
147
148
149def _get_h5_dataset(directory, train_mode=True, epochs=1, batch_size=1000):
150    """
151    Get dataset with h5 format.
152
153    Args:
154        directory (str): Dataset directory.
155        train_mode (bool): Whether dataset is use for train or eval (default=True).
156        epochs (int): Dataset epoch size (default=1).
157        batch_size (int): Dataset batch size (default=1000)
158
159    Returns:
160        Dataset.
161    """
162    data_para = {'batch_size': batch_size}
163    if train_mode:
164        data_para['random_sample'] = True
165        data_para['shuffle_block'] = True
166
167    h5_dataset = H5Dataset(data_path=directory, train_mode=train_mode)
168    numbers_of_batch = math.ceil(h5_dataset.data_size / batch_size)
169
170    def _iter_h5_data():
171        train_eval_gen = h5_dataset.batch_generator(**data_para)
172        for _ in range(0, numbers_of_batch, 1):
173            yield train_eval_gen.__next__()
174
175    data_set = ds.GeneratorDataset(_iter_h5_data, ["ids", "weights", "labels"], num_samples=3000)
176    data_set = data_set.repeat(epochs)
177    return data_set
178
179
180def _get_mindrecord_dataset(directory, train_mode=True, epochs=1, batch_size=1000,
181                            line_per_sample=1000, rank_size=None, rank_id=None):
182    """
183    Get dataset with mindrecord format.
184
185    Args:
186        directory (str): Dataset directory.
187        train_mode (bool): Whether dataset is use for train or eval (default=True).
188        epochs (int): Dataset epoch size (default=1).
189        batch_size (int): Dataset batch size (default=1000).
190        line_per_sample (int): The number of sample per line (default=1000).
191        rank_size (int): The number of device, not necessary for single device (default=None).
192        rank_id (int): Id of device, not necessary for single device (default=None).
193
194    Returns:
195        Dataset.
196    """
197    file_prefix_name = 'train_input_part.mindrecord' if train_mode else 'test_input_part.mindrecord'
198    file_suffix_name = '00' if train_mode else '0'
199    shuffle = train_mode
200
201    if rank_size is not None and rank_id is not None:
202        data_set = ds.MindDataset(os.path.join(directory, file_prefix_name + file_suffix_name),
203                                  columns_list=['feat_ids', 'feat_vals', 'label'],
204                                  num_shards=rank_size, shard_id=rank_id, shuffle=shuffle,
205                                  num_parallel_workers=8)
206    else:
207        data_set = ds.MindDataset(os.path.join(directory, file_prefix_name + file_suffix_name),
208                                  columns_list=['feat_ids', 'feat_vals', 'label'],
209                                  shuffle=shuffle, num_parallel_workers=8)
210    data_set = data_set.batch(int(batch_size / line_per_sample), drop_remainder=True)
211    data_set = data_set.map(operations=(lambda x, y, z: (np.array(x).flatten().reshape(batch_size, 39),
212                                                         np.array(y).flatten().reshape(batch_size, 39),
213                                                         np.array(z).flatten().reshape(batch_size, 1))),
214                            input_columns=['feat_ids', 'feat_vals', 'label'],
215                            column_order=['feat_ids', 'feat_vals', 'label'],
216                            num_parallel_workers=8)
217    data_set = data_set.repeat(epochs)
218    return data_set
219
220
221def _get_tf_dataset(directory, train_mode=True, epochs=1, batch_size=1000,
222                    line_per_sample=1000, rank_size=None, rank_id=None):
223    """
224    Get dataset with tfrecord format.
225
226    Args:
227        directory (str): Dataset directory.
228        train_mode (bool): Whether dataset is use for train or eval (default=True).
229        epochs (int): Dataset epoch size (default=1).
230        batch_size (int): Dataset batch size (default=1000).
231        line_per_sample (int): The number of sample per line (default=1000).
232        rank_size (int): The number of device, not necessary for single device (default=None).
233        rank_id (int): Id of device, not necessary for single device (default=None).
234
235    Returns:
236        Dataset.
237    """
238    dataset_files = []
239    file_prefixt_name = 'train' if train_mode else 'test'
240    shuffle = train_mode
241    for (dir_path, _, filenames) in os.walk(directory):
242        for filename in filenames:
243            if file_prefixt_name in filename and 'tfrecord' in filename:
244                dataset_files.append(os.path.join(dir_path, filename))
245    schema = ds.Schema()
246    schema.add_column('feat_ids', de_type=mstype.int32)
247    schema.add_column('feat_vals', de_type=mstype.float32)
248    schema.add_column('label', de_type=mstype.float32)
249    if rank_size is not None and rank_id is not None:
250        data_set = ds.TFRecordDataset(dataset_files=dataset_files, shuffle=shuffle,
251                                      schema=schema, num_parallel_workers=8,
252                                      num_shards=rank_size, shard_id=rank_id,
253                                      shard_equal_rows=True, num_samples=3000)
254    else:
255        data_set = ds.TFRecordDataset(dataset_files=dataset_files, shuffle=shuffle,
256                                      schema=schema, num_parallel_workers=8, num_samples=3000)
257    data_set = data_set.batch(int(batch_size / line_per_sample), drop_remainder=True)
258    data_set = data_set.map(operations=(lambda x, y, z: (
259        np.array(x).flatten().reshape(batch_size, 39),
260        np.array(y).flatten().reshape(batch_size, 39),
261        np.array(z).flatten().reshape(batch_size, 1))),
262                            input_columns=['feat_ids', 'feat_vals', 'label'],
263                            column_order=['feat_ids', 'feat_vals', 'label'],
264                            num_parallel_workers=8)
265    data_set = data_set.repeat(epochs)
266    return data_set
267
268
269def create_dataset(directory, train_mode=True, epochs=1, batch_size=1000,
270                   data_type=DataType.TFRECORD, line_per_sample=1000,
271                   rank_size=None, rank_id=None):
272    """
273    Get dataset.
274
275    Args:
276        directory (str): Dataset directory.
277        train_mode (bool): Whether dataset is use for train or eval (default=True).
278        epochs (int): Dataset epoch size (default=1).
279        batch_size (int): Dataset batch size (default=1000).
280        data_type (DataType): The type of dataset which is one of H5, TFRECORE, MINDRECORD (default=TFRECORD).
281        line_per_sample (int): The number of sample per line (default=1000).
282        rank_size (int): The number of device, not necessary for single device (default=None).
283        rank_id (int): Id of device, not necessary for single device (default=None).
284
285    Returns:
286        Dataset.
287    """
288    if data_type == DataType.MINDRECORD:
289        return _get_mindrecord_dataset(directory, train_mode, epochs,
290                                       batch_size, line_per_sample,
291                                       rank_size, rank_id)
292    if data_type == DataType.TFRECORD:
293        return _get_tf_dataset(directory, train_mode, epochs, batch_size,
294                               line_per_sample, rank_size=rank_size, rank_id=rank_id)
295
296    if rank_size is not None and rank_size > 1:
297        raise ValueError('Please use mindrecord dataset.')
298    return _get_h5_dataset(directory, train_mode, epochs, batch_size)
299