1# Copyright 2020 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15""" 16Create train or eval dataset. 17""" 18import os 19import math 20from enum import Enum 21 22import pandas as pd 23import numpy as np 24import mindspore.dataset as ds 25import mindspore.common.dtype as mstype 26 27from .config import DataConfig 28 29 30class DataType(Enum): 31 """ 32 Enumerate supported dataset format. 33 """ 34 MINDRECORD = 1 35 TFRECORD = 2 36 H5 = 3 37 38 39class H5Dataset(): 40 """ 41 Create dataset with H5 format. 42 43 Args: 44 data_path (str): Dataset directory. 45 train_mode (bool): Whether dataset is used for train or eval (default=True). 46 train_num_of_parts (int): The number of train data file (default=21). 47 test_num_of_parts (int): The number of test data file (default=3). 48 """ 49 max_length = 39 50 51 def __init__(self, data_path, train_mode=True, 52 train_num_of_parts=DataConfig.train_num_of_parts, 53 test_num_of_parts=DataConfig.test_num_of_parts): 54 self._hdf_data_dir = data_path 55 self._is_training = train_mode 56 if self._is_training: 57 self._file_prefix = 'train' 58 self._num_of_parts = train_num_of_parts 59 else: 60 self._file_prefix = 'test' 61 self._num_of_parts = test_num_of_parts 62 self.data_size = self._bin_count(self._hdf_data_dir, self._file_prefix, self._num_of_parts) 63 print("data_size: {}".format(self.data_size)) 64 65 def _bin_count(self, hdf_data_dir, file_prefix, num_of_parts): 66 size = 0 67 for part in range(num_of_parts): 68 _y = pd.read_hdf(os.path.join(hdf_data_dir, f'{file_prefix}_output_part_{str(part)}.h5')) 69 size += _y.shape[0] 70 return size 71 72 def _iterate_hdf_files_(self, num_of_parts=None, 73 shuffle_block=False): 74 """ 75 iterate among hdf files(blocks). when the whole data set is finished, the iterator restarts 76 from the beginning, thus the data stream will never stop 77 :param train_mode: True or false,false is eval_mode, 78 this file iterator will go through the train set 79 :param num_of_parts: number of files 80 :param shuffle_block: shuffle block files at every round 81 :return: input_hdf_file_name, output_hdf_file_name, finish_flag 82 """ 83 parts = np.arange(num_of_parts) 84 while True: 85 if shuffle_block: 86 for _ in range(int(shuffle_block)): 87 np.random.shuffle(parts) 88 for i, p in enumerate(parts): 89 yield os.path.join(self._hdf_data_dir, f'{self._file_prefix}_input_part_{str(p)}.h5'), \ 90 os.path.join(self._hdf_data_dir, f'{self._file_prefix}_output_part_{str(p)}.h5'), \ 91 i + 1 == len(parts) 92 93 def _generator(self, X, y, batch_size, shuffle=True): 94 """ 95 should be accessed only in private 96 :param X: 97 :param y: 98 :param batch_size: 99 :param shuffle: 100 :return: 101 """ 102 number_of_batches = np.ceil(1. * X.shape[0] / batch_size) 103 counter = 0 104 finished = False 105 sample_index = np.arange(X.shape[0]) 106 if shuffle: 107 for _ in range(int(shuffle)): 108 np.random.shuffle(sample_index) 109 assert X.shape[0] > 0 110 while True: 111 batch_index = sample_index[batch_size * counter: batch_size * (counter + 1)] 112 X_batch = X[batch_index] 113 y_batch = y[batch_index] 114 counter += 1 115 yield X_batch, y_batch, finished 116 if counter == number_of_batches: 117 counter = 0 118 finished = True 119 120 def batch_generator(self, batch_size=1000, 121 random_sample=False, shuffle_block=False): 122 """ 123 :param train_mode: True or false,false is eval_mode, 124 :param batch_size 125 :param num_of_parts: number of files 126 :param random_sample: if True, will shuffle 127 :param shuffle_block: shuffle file blocks at every round 128 :return: 129 """ 130 131 for hdf_in, hdf_out, _ in self._iterate_hdf_files_(self._num_of_parts, 132 shuffle_block): 133 start = stop = None 134 X_all = pd.read_hdf(hdf_in, start=start, stop=stop).values 135 y_all = pd.read_hdf(hdf_out, start=start, stop=stop).values 136 data_gen = self._generator(X_all, y_all, batch_size, 137 shuffle=random_sample) 138 finished = False 139 140 while not finished: 141 X, y, finished = data_gen.__next__() 142 X_id = X[:, 0:self.max_length] 143 X_va = X[:, self.max_length:] 144 yield np.array(X_id.astype(dtype=np.int32)), \ 145 np.array(X_va.astype(dtype=np.float32)), \ 146 np.array(y.astype(dtype=np.float32)) 147 148 149def _get_h5_dataset(directory, train_mode=True, epochs=1, batch_size=1000): 150 """ 151 Get dataset with h5 format. 152 153 Args: 154 directory (str): Dataset directory. 155 train_mode (bool): Whether dataset is use for train or eval (default=True). 156 epochs (int): Dataset epoch size (default=1). 157 batch_size (int): Dataset batch size (default=1000) 158 159 Returns: 160 Dataset. 161 """ 162 data_para = {'batch_size': batch_size} 163 if train_mode: 164 data_para['random_sample'] = True 165 data_para['shuffle_block'] = True 166 167 h5_dataset = H5Dataset(data_path=directory, train_mode=train_mode) 168 numbers_of_batch = math.ceil(h5_dataset.data_size / batch_size) 169 170 def _iter_h5_data(): 171 train_eval_gen = h5_dataset.batch_generator(**data_para) 172 for _ in range(0, numbers_of_batch, 1): 173 yield train_eval_gen.__next__() 174 175 data_set = ds.GeneratorDataset(_iter_h5_data, ["ids", "weights", "labels"], num_samples=3000) 176 data_set = data_set.repeat(epochs) 177 return data_set 178 179 180def _get_mindrecord_dataset(directory, train_mode=True, epochs=1, batch_size=1000, 181 line_per_sample=1000, rank_size=None, rank_id=None): 182 """ 183 Get dataset with mindrecord format. 184 185 Args: 186 directory (str): Dataset directory. 187 train_mode (bool): Whether dataset is use for train or eval (default=True). 188 epochs (int): Dataset epoch size (default=1). 189 batch_size (int): Dataset batch size (default=1000). 190 line_per_sample (int): The number of sample per line (default=1000). 191 rank_size (int): The number of device, not necessary for single device (default=None). 192 rank_id (int): Id of device, not necessary for single device (default=None). 193 194 Returns: 195 Dataset. 196 """ 197 file_prefix_name = 'train_input_part.mindrecord' if train_mode else 'test_input_part.mindrecord' 198 file_suffix_name = '00' if train_mode else '0' 199 shuffle = train_mode 200 201 if rank_size is not None and rank_id is not None: 202 data_set = ds.MindDataset(os.path.join(directory, file_prefix_name + file_suffix_name), 203 columns_list=['feat_ids', 'feat_vals', 'label'], 204 num_shards=rank_size, shard_id=rank_id, shuffle=shuffle, 205 num_parallel_workers=8) 206 else: 207 data_set = ds.MindDataset(os.path.join(directory, file_prefix_name + file_suffix_name), 208 columns_list=['feat_ids', 'feat_vals', 'label'], 209 shuffle=shuffle, num_parallel_workers=8) 210 data_set = data_set.batch(int(batch_size / line_per_sample), drop_remainder=True) 211 data_set = data_set.map(operations=(lambda x, y, z: (np.array(x).flatten().reshape(batch_size, 39), 212 np.array(y).flatten().reshape(batch_size, 39), 213 np.array(z).flatten().reshape(batch_size, 1))), 214 input_columns=['feat_ids', 'feat_vals', 'label'], 215 column_order=['feat_ids', 'feat_vals', 'label'], 216 num_parallel_workers=8) 217 data_set = data_set.repeat(epochs) 218 return data_set 219 220 221def _get_tf_dataset(directory, train_mode=True, epochs=1, batch_size=1000, 222 line_per_sample=1000, rank_size=None, rank_id=None): 223 """ 224 Get dataset with tfrecord format. 225 226 Args: 227 directory (str): Dataset directory. 228 train_mode (bool): Whether dataset is use for train or eval (default=True). 229 epochs (int): Dataset epoch size (default=1). 230 batch_size (int): Dataset batch size (default=1000). 231 line_per_sample (int): The number of sample per line (default=1000). 232 rank_size (int): The number of device, not necessary for single device (default=None). 233 rank_id (int): Id of device, not necessary for single device (default=None). 234 235 Returns: 236 Dataset. 237 """ 238 dataset_files = [] 239 file_prefixt_name = 'train' if train_mode else 'test' 240 shuffle = train_mode 241 for (dir_path, _, filenames) in os.walk(directory): 242 for filename in filenames: 243 if file_prefixt_name in filename and 'tfrecord' in filename: 244 dataset_files.append(os.path.join(dir_path, filename)) 245 schema = ds.Schema() 246 schema.add_column('feat_ids', de_type=mstype.int32) 247 schema.add_column('feat_vals', de_type=mstype.float32) 248 schema.add_column('label', de_type=mstype.float32) 249 if rank_size is not None and rank_id is not None: 250 data_set = ds.TFRecordDataset(dataset_files=dataset_files, shuffle=shuffle, 251 schema=schema, num_parallel_workers=8, 252 num_shards=rank_size, shard_id=rank_id, 253 shard_equal_rows=True, num_samples=3000) 254 else: 255 data_set = ds.TFRecordDataset(dataset_files=dataset_files, shuffle=shuffle, 256 schema=schema, num_parallel_workers=8, num_samples=3000) 257 data_set = data_set.batch(int(batch_size / line_per_sample), drop_remainder=True) 258 data_set = data_set.map(operations=(lambda x, y, z: ( 259 np.array(x).flatten().reshape(batch_size, 39), 260 np.array(y).flatten().reshape(batch_size, 39), 261 np.array(z).flatten().reshape(batch_size, 1))), 262 input_columns=['feat_ids', 'feat_vals', 'label'], 263 column_order=['feat_ids', 'feat_vals', 'label'], 264 num_parallel_workers=8) 265 data_set = data_set.repeat(epochs) 266 return data_set 267 268 269def create_dataset(directory, train_mode=True, epochs=1, batch_size=1000, 270 data_type=DataType.TFRECORD, line_per_sample=1000, 271 rank_size=None, rank_id=None): 272 """ 273 Get dataset. 274 275 Args: 276 directory (str): Dataset directory. 277 train_mode (bool): Whether dataset is use for train or eval (default=True). 278 epochs (int): Dataset epoch size (default=1). 279 batch_size (int): Dataset batch size (default=1000). 280 data_type (DataType): The type of dataset which is one of H5, TFRECORE, MINDRECORD (default=TFRECORD). 281 line_per_sample (int): The number of sample per line (default=1000). 282 rank_size (int): The number of device, not necessary for single device (default=None). 283 rank_id (int): Id of device, not necessary for single device (default=None). 284 285 Returns: 286 Dataset. 287 """ 288 if data_type == DataType.MINDRECORD: 289 return _get_mindrecord_dataset(directory, train_mode, epochs, 290 batch_size, line_per_sample, 291 rank_size, rank_id) 292 if data_type == DataType.TFRECORD: 293 return _get_tf_dataset(directory, train_mode, epochs, batch_size, 294 line_per_sample, rank_size=rank_size, rank_id=rank_id) 295 296 if rank_size is not None and rank_size > 1: 297 raise ValueError('Please use mindrecord dataset.') 298 return _get_h5_dataset(directory, train_mode, epochs, batch_size) 299