1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15""" 16Test USPS dataset operators 17""" 18import os 19from typing import cast 20 21import matplotlib.pyplot as plt 22import numpy as np 23import pytest 24 25import mindspore.dataset as ds 26import mindspore.dataset.vision.c_transforms as vision 27from mindspore import log as logger 28 29DATA_DIR = "../data/dataset/testUSPSDataset" 30WRONG_DIR = "../data/dataset/testMnistData" 31 32 33def load_usps(path, usage): 34 """ 35 load USPS data 36 """ 37 assert usage in ["train", "test"] 38 if usage == "train": 39 data_path = os.path.realpath(os.path.join(path, "usps")) 40 elif usage == "test": 41 data_path = os.path.realpath(os.path.join(path, "usps.t")) 42 43 with open(data_path, 'r') as f: 44 raw_data = [line.split() for line in f.readlines()] 45 tmp_list = [[x.split(':')[-1] for x in data[1:]] for data in raw_data] 46 images = np.asarray(tmp_list, dtype=np.float32).reshape((-1, 16, 16, 1)) 47 images = ((cast(np.ndarray, images) + 1) / 2 * 255).astype(dtype=np.uint8) 48 labels = [int(d[0]) - 1 for d in raw_data] 49 return images, labels 50 51 52def visualize_dataset(images, labels): 53 """ 54 Helper function to visualize the dataset samples 55 """ 56 num_samples = len(images) 57 for i in range(num_samples): 58 plt.subplot(1, num_samples, i + 1) 59 plt.imshow(images[i].squeeze(), cmap=plt.cm.gray) 60 plt.title(labels[i]) 61 plt.show() 62 63 64def test_usps_content_check(): 65 """ 66 Validate USPSDataset image readings 67 """ 68 logger.info("Test USPSDataset Op with content check") 69 train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=10, shuffle=False) 70 images, labels = load_usps(DATA_DIR, "train") 71 num_iter = 0 72 # in this example, each dictionary has keys "image" and "label" 73 for i, data in enumerate(train_data.create_dict_iterator(num_epochs=1, output_numpy=True)): 74 for m in range(16): 75 for n in range(16): 76 assert (data["image"][m, n, 0] != 0 or images[i][m, n, 0] != 255) and \ 77 (data["image"][m, n, 0] != 255 or images[i][m, n, 0] != 0) 78 assert (data["image"][m, n, 0] == images[i][m, n, 0]) or\ 79 (data["image"][m, n, 0] == images[i][m, n, 0] + 1) or\ 80 (data["image"][m, n, 0] + 1 == images[i][m, n, 0]) 81 np.testing.assert_array_equal(data["label"], labels[i]) 82 num_iter += 1 83 assert num_iter == 3 84 85 test_data = ds.USPSDataset(DATA_DIR, "test", num_samples=3, shuffle=False) 86 images, labels = load_usps(DATA_DIR, "test") 87 num_iter = 0 88 # in this example, each dictionary has keys "image" and "label" 89 for i, data in enumerate(test_data.create_dict_iterator(num_epochs=1, output_numpy=True)): 90 for m in range(16): 91 for n in range(16): 92 if (data["image"][m, n, 0] == 0 and images[i][m, n, 0] == 255) or\ 93 (data["image"][m, n, 0] == 255 and images[i][m, n, 0] == 0): 94 assert False 95 if (data["image"][m, n, 0] != images[i][m, n, 0]) and\ 96 (data["image"][m, n, 0] != images[i][m, n, 0] + 1) and\ 97 (data["image"][m, n, 0] + 1 != images[i][m, n, 0]): 98 assert False 99 np.testing.assert_array_equal(data["label"], labels[i]) 100 num_iter += 1 101 assert num_iter == 3 102 103 104def test_usps_basic(): 105 """ 106 Validate USPSDataset 107 """ 108 logger.info("Test USPSDataset Op") 109 110 # case 1: test loading whole dataset 111 train_data = ds.USPSDataset(DATA_DIR, "train") 112 num_iter = 0 113 for _ in train_data.create_dict_iterator(num_epochs=1): 114 num_iter += 1 115 assert num_iter == 3 116 117 test_data = ds.USPSDataset(DATA_DIR, "test") 118 num_iter = 0 119 for _ in test_data.create_dict_iterator(num_epochs=1): 120 num_iter += 1 121 assert num_iter == 3 122 123 # case 2: test num_samples 124 train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=2) 125 num_iter = 0 126 for _ in train_data.create_dict_iterator(num_epochs=1): 127 num_iter += 1 128 assert num_iter == 2 129 130 # case 3: test repeat 131 train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=2) 132 train_data = train_data.repeat(5) 133 num_iter = 0 134 for _ in train_data.create_dict_iterator(num_epochs=1): 135 num_iter += 1 136 assert num_iter == 10 137 138 # case 4: test batch with drop_remainder=False 139 train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=3) 140 assert train_data.get_dataset_size() == 3 141 assert train_data.get_batch_size() == 1 142 train_data = train_data.batch(batch_size=2) # drop_remainder is default to be False 143 assert train_data.get_batch_size() == 2 144 assert train_data.get_dataset_size() == 2 145 146 num_iter = 0 147 for _ in train_data.create_dict_iterator(num_epochs=1): 148 num_iter += 1 149 assert num_iter == 2 150 151 # case 5: test batch with drop_remainder=True 152 train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=3) 153 assert train_data.get_dataset_size() == 3 154 assert train_data.get_batch_size() == 1 155 train_data = train_data.batch(batch_size=2, drop_remainder=True) # the rest of incomplete batch will be dropped 156 assert train_data.get_dataset_size() == 1 157 assert train_data.get_batch_size() == 2 158 num_iter = 0 159 for _ in train_data.create_dict_iterator(num_epochs=1): 160 num_iter += 1 161 assert num_iter == 1 162 163 164def test_usps_exception(): 165 """ 166 Test error cases for USPSDataset 167 """ 168 error_msg_3 = "num_shards is specified and currently requires shard_id as well" 169 with pytest.raises(RuntimeError, match=error_msg_3): 170 ds.USPSDataset(DATA_DIR, "train", num_shards=10) 171 ds.USPSDataset(DATA_DIR, "test", num_shards=10) 172 173 error_msg_4 = "shard_id is specified but num_shards is not" 174 with pytest.raises(RuntimeError, match=error_msg_4): 175 ds.USPSDataset(DATA_DIR, "train", shard_id=0) 176 ds.USPSDataset(DATA_DIR, "test", shard_id=0) 177 178 error_msg_5 = "Input shard_id is not within the required interval" 179 with pytest.raises(ValueError, match=error_msg_5): 180 ds.USPSDataset(DATA_DIR, "train", num_shards=5, shard_id=-1) 181 ds.USPSDataset(DATA_DIR, "test", num_shards=5, shard_id=-1) 182 with pytest.raises(ValueError, match=error_msg_5): 183 ds.USPSDataset(DATA_DIR, "train", num_shards=5, shard_id=5) 184 ds.USPSDataset(DATA_DIR, "test", num_shards=5, shard_id=5) 185 with pytest.raises(ValueError, match=error_msg_5): 186 ds.USPSDataset(DATA_DIR, "train", num_shards=2, shard_id=5) 187 ds.USPSDataset(DATA_DIR, "test", num_shards=2, shard_id=5) 188 189 error_msg_6 = "num_parallel_workers exceeds" 190 with pytest.raises(ValueError, match=error_msg_6): 191 ds.USPSDataset(DATA_DIR, "train", shuffle=False, num_parallel_workers=0) 192 ds.USPSDataset(DATA_DIR, "test", shuffle=False, num_parallel_workers=0) 193 with pytest.raises(ValueError, match=error_msg_6): 194 ds.USPSDataset(DATA_DIR, "train", shuffle=False, num_parallel_workers=256) 195 ds.USPSDataset(DATA_DIR, "test", shuffle=False, num_parallel_workers=256) 196 with pytest.raises(ValueError, match=error_msg_6): 197 ds.USPSDataset(DATA_DIR, "train", shuffle=False, num_parallel_workers=-2) 198 ds.USPSDataset(DATA_DIR, "test", shuffle=False, num_parallel_workers=-2) 199 200 error_msg_7 = "Argument shard_id" 201 with pytest.raises(TypeError, match=error_msg_7): 202 ds.USPSDataset(DATA_DIR, "train", num_shards=2, shard_id="0") 203 ds.USPSDataset(DATA_DIR, "test", num_shards=2, shard_id="0") 204 205 error_msg_8 = "invalid input shape" 206 with pytest.raises(RuntimeError, match=error_msg_8): 207 train_data = ds.USPSDataset(DATA_DIR, "train") 208 train_data = train_data.map(operations=vision.Decode(), input_columns=["image"], num_parallel_workers=1) 209 for _ in train_data.__iter__(): 210 pass 211 212 test_data = ds.USPSDataset(DATA_DIR, "test") 213 test_data = test_data.map(operations=vision.Decode(), input_columns=["image"], num_parallel_workers=1) 214 for _ in test_data.__iter__(): 215 pass 216 217 error_msg_9 = "failed to find USPS train data file" 218 with pytest.raises(RuntimeError, match=error_msg_9): 219 train_data = ds.USPSDataset(WRONG_DIR, "train") 220 for _ in train_data.__iter__(): 221 pass 222 error_msg_10 = "failed to find USPS test data file" 223 with pytest.raises(RuntimeError, match=error_msg_10): 224 test_data = ds.USPSDataset(WRONG_DIR, "test") 225 for _ in test_data.__iter__(): 226 pass 227 228 229def test_usps_visualize(plot=False): 230 """ 231 Visualize USPSDataset results 232 """ 233 logger.info("Test USPSDataset visualization") 234 235 train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=3, shuffle=False) 236 num_iter = 0 237 image_list, label_list = [], [] 238 for item in train_data.create_dict_iterator(num_epochs=1, output_numpy=True): 239 image = item["image"] 240 label = item["label"] 241 image_list.append(image) 242 label_list.append("label {}".format(label)) 243 assert isinstance(image, np.ndarray) 244 assert image.shape == (16, 16, 1) 245 assert image.dtype == np.uint8 246 assert label.dtype == np.uint32 247 num_iter += 1 248 assert num_iter == 3 249 if plot: 250 visualize_dataset(image_list, label_list) 251 252 test_data = ds.USPSDataset(DATA_DIR, "test", num_samples=3, shuffle=False) 253 num_iter = 0 254 image_list, label_list = [], [] 255 for item in test_data.create_dict_iterator(num_epochs=1, output_numpy=True): 256 image = item["image"] 257 label = item["label"] 258 image_list.append(image) 259 label_list.append("label {}".format(label)) 260 assert isinstance(image, np.ndarray) 261 assert image.shape == (16, 16, 1) 262 assert image.dtype == np.uint8 263 assert label.dtype == np.uint32 264 num_iter += 1 265 assert num_iter == 3 266 if plot: 267 visualize_dataset(image_list, label_list) 268 269 270def test_usps_usage(): 271 """ 272 Validate USPSDataset image readings 273 """ 274 logger.info("Test USPSDataset usage flag") 275 276 def test_config(usage, path=None): 277 path = DATA_DIR if path is None else path 278 try: 279 data = ds.USPSDataset(path, usage=usage, shuffle=False) 280 num_rows = 0 281 for _ in data.create_dict_iterator(num_epochs=1, output_numpy=True): 282 num_rows += 1 283 except (ValueError, TypeError, RuntimeError) as e: 284 return str(e) 285 return num_rows 286 287 assert test_config("train") == 3 288 assert test_config("test") == 3 289 290 assert "usage is not within the valid set of ['train', 'test', 'all']" in test_config("invalid") 291 assert "Argument usage with value ['list'] is not of type [<class 'str'>]" in test_config(["list"]) 292 293 # change this directory to the folder that contains all USPS files 294 all_files_path = None 295 # the following tests on the entire datasets 296 if all_files_path is not None: 297 assert test_config("train", all_files_path) == 3 298 assert test_config("test", all_files_path) == 3 299 assert ds.USPSDataset(all_files_path, usage="train").get_dataset_size() == 3 300 assert ds.USPSDataset(all_files_path, usage="test").get_dataset_size() == 3 301 302 303if __name__ == '__main__': 304 test_usps_content_check() 305 test_usps_basic() 306 test_usps_exception() 307 test_usps_visualize(plot=True) 308 test_usps_usage() 309