• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""
16Test USPS dataset operators
17"""
18import os
19from typing import cast
20
21import matplotlib.pyplot as plt
22import numpy as np
23import pytest
24
25import mindspore.dataset as ds
26import mindspore.dataset.vision.c_transforms as vision
27from mindspore import log as logger
28
29DATA_DIR = "../data/dataset/testUSPSDataset"
30WRONG_DIR = "../data/dataset/testMnistData"
31
32
33def load_usps(path, usage):
34    """
35    load USPS data
36    """
37    assert usage in ["train", "test"]
38    if usage == "train":
39        data_path = os.path.realpath(os.path.join(path, "usps"))
40    elif usage == "test":
41        data_path = os.path.realpath(os.path.join(path, "usps.t"))
42
43    with open(data_path, 'r') as f:
44        raw_data = [line.split() for line in f.readlines()]
45        tmp_list = [[x.split(':')[-1] for x in data[1:]] for data in raw_data]
46        images = np.asarray(tmp_list, dtype=np.float32).reshape((-1, 16, 16, 1))
47        images = ((cast(np.ndarray, images) + 1) / 2 * 255).astype(dtype=np.uint8)
48        labels = [int(d[0]) - 1 for d in raw_data]
49    return images, labels
50
51
52def visualize_dataset(images, labels):
53    """
54    Helper function to visualize the dataset samples
55    """
56    num_samples = len(images)
57    for i in range(num_samples):
58        plt.subplot(1, num_samples, i + 1)
59        plt.imshow(images[i].squeeze(), cmap=plt.cm.gray)
60        plt.title(labels[i])
61    plt.show()
62
63
64def test_usps_content_check():
65    """
66    Validate USPSDataset image readings
67    """
68    logger.info("Test USPSDataset Op with content check")
69    train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=10, shuffle=False)
70    images, labels = load_usps(DATA_DIR, "train")
71    num_iter = 0
72    # in this example, each dictionary has keys "image" and "label"
73    for i, data in enumerate(train_data.create_dict_iterator(num_epochs=1, output_numpy=True)):
74        for m in range(16):
75            for n in range(16):
76                assert (data["image"][m, n, 0] != 0 or images[i][m, n, 0] != 255) and \
77                        (data["image"][m, n, 0] != 255 or images[i][m, n, 0] != 0)
78                assert (data["image"][m, n, 0] == images[i][m, n, 0]) or\
79                        (data["image"][m, n, 0] == images[i][m, n, 0] + 1) or\
80                        (data["image"][m, n, 0] + 1 == images[i][m, n, 0])
81        np.testing.assert_array_equal(data["label"], labels[i])
82        num_iter += 1
83    assert num_iter == 3
84
85    test_data = ds.USPSDataset(DATA_DIR, "test", num_samples=3, shuffle=False)
86    images, labels = load_usps(DATA_DIR, "test")
87    num_iter = 0
88    # in this example, each dictionary has keys "image" and "label"
89    for i, data in enumerate(test_data.create_dict_iterator(num_epochs=1, output_numpy=True)):
90        for m in range(16):
91            for n in range(16):
92                if (data["image"][m, n, 0] == 0 and images[i][m, n, 0] == 255) or\
93                        (data["image"][m, n, 0] == 255 and images[i][m, n, 0] == 0):
94                    assert False
95                if (data["image"][m, n, 0] != images[i][m, n, 0]) and\
96                        (data["image"][m, n, 0] != images[i][m, n, 0] + 1) and\
97                        (data["image"][m, n, 0] + 1 != images[i][m, n, 0]):
98                    assert False
99        np.testing.assert_array_equal(data["label"], labels[i])
100        num_iter += 1
101    assert num_iter == 3
102
103
104def test_usps_basic():
105    """
106    Validate USPSDataset
107    """
108    logger.info("Test USPSDataset Op")
109
110    # case 1: test loading whole dataset
111    train_data = ds.USPSDataset(DATA_DIR, "train")
112    num_iter = 0
113    for _ in train_data.create_dict_iterator(num_epochs=1):
114        num_iter += 1
115    assert num_iter == 3
116
117    test_data = ds.USPSDataset(DATA_DIR, "test")
118    num_iter = 0
119    for _ in test_data.create_dict_iterator(num_epochs=1):
120        num_iter += 1
121    assert num_iter == 3
122
123    # case 2: test num_samples
124    train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=2)
125    num_iter = 0
126    for _ in train_data.create_dict_iterator(num_epochs=1):
127        num_iter += 1
128    assert num_iter == 2
129
130    # case 3: test repeat
131    train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=2)
132    train_data = train_data.repeat(5)
133    num_iter = 0
134    for _ in train_data.create_dict_iterator(num_epochs=1):
135        num_iter += 1
136    assert num_iter == 10
137
138    # case 4: test batch with drop_remainder=False
139    train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=3)
140    assert train_data.get_dataset_size() == 3
141    assert train_data.get_batch_size() == 1
142    train_data = train_data.batch(batch_size=2)  # drop_remainder is default to be False
143    assert train_data.get_batch_size() == 2
144    assert train_data.get_dataset_size() == 2
145
146    num_iter = 0
147    for _ in train_data.create_dict_iterator(num_epochs=1):
148        num_iter += 1
149    assert num_iter == 2
150
151    # case 5: test batch with drop_remainder=True
152    train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=3)
153    assert train_data.get_dataset_size() == 3
154    assert train_data.get_batch_size() == 1
155    train_data = train_data.batch(batch_size=2, drop_remainder=True)  # the rest of incomplete batch will be dropped
156    assert train_data.get_dataset_size() == 1
157    assert train_data.get_batch_size() == 2
158    num_iter = 0
159    for _ in train_data.create_dict_iterator(num_epochs=1):
160        num_iter += 1
161    assert num_iter == 1
162
163
164def test_usps_exception():
165    """
166    Test error cases for USPSDataset
167    """
168    error_msg_3 = "num_shards is specified and currently requires shard_id as well"
169    with pytest.raises(RuntimeError, match=error_msg_3):
170        ds.USPSDataset(DATA_DIR, "train", num_shards=10)
171        ds.USPSDataset(DATA_DIR, "test", num_shards=10)
172
173    error_msg_4 = "shard_id is specified but num_shards is not"
174    with pytest.raises(RuntimeError, match=error_msg_4):
175        ds.USPSDataset(DATA_DIR, "train", shard_id=0)
176        ds.USPSDataset(DATA_DIR, "test", shard_id=0)
177
178    error_msg_5 = "Input shard_id is not within the required interval"
179    with pytest.raises(ValueError, match=error_msg_5):
180        ds.USPSDataset(DATA_DIR, "train", num_shards=5, shard_id=-1)
181        ds.USPSDataset(DATA_DIR, "test", num_shards=5, shard_id=-1)
182    with pytest.raises(ValueError, match=error_msg_5):
183        ds.USPSDataset(DATA_DIR, "train", num_shards=5, shard_id=5)
184        ds.USPSDataset(DATA_DIR, "test", num_shards=5, shard_id=5)
185    with pytest.raises(ValueError, match=error_msg_5):
186        ds.USPSDataset(DATA_DIR, "train", num_shards=2, shard_id=5)
187        ds.USPSDataset(DATA_DIR, "test", num_shards=2, shard_id=5)
188
189    error_msg_6 = "num_parallel_workers exceeds"
190    with pytest.raises(ValueError, match=error_msg_6):
191        ds.USPSDataset(DATA_DIR, "train", shuffle=False, num_parallel_workers=0)
192        ds.USPSDataset(DATA_DIR, "test", shuffle=False, num_parallel_workers=0)
193    with pytest.raises(ValueError, match=error_msg_6):
194        ds.USPSDataset(DATA_DIR, "train", shuffle=False, num_parallel_workers=256)
195        ds.USPSDataset(DATA_DIR, "test", shuffle=False, num_parallel_workers=256)
196    with pytest.raises(ValueError, match=error_msg_6):
197        ds.USPSDataset(DATA_DIR, "train", shuffle=False, num_parallel_workers=-2)
198        ds.USPSDataset(DATA_DIR, "test", shuffle=False, num_parallel_workers=-2)
199
200    error_msg_7 = "Argument shard_id"
201    with pytest.raises(TypeError, match=error_msg_7):
202        ds.USPSDataset(DATA_DIR, "train", num_shards=2, shard_id="0")
203        ds.USPSDataset(DATA_DIR, "test", num_shards=2, shard_id="0")
204
205    error_msg_8 = "invalid input shape"
206    with pytest.raises(RuntimeError, match=error_msg_8):
207        train_data = ds.USPSDataset(DATA_DIR, "train")
208        train_data = train_data.map(operations=vision.Decode(), input_columns=["image"], num_parallel_workers=1)
209        for _ in train_data.__iter__():
210            pass
211
212        test_data = ds.USPSDataset(DATA_DIR, "test")
213        test_data = test_data.map(operations=vision.Decode(), input_columns=["image"], num_parallel_workers=1)
214        for _ in test_data.__iter__():
215            pass
216
217    error_msg_9 = "failed to find USPS train data file"
218    with pytest.raises(RuntimeError, match=error_msg_9):
219        train_data = ds.USPSDataset(WRONG_DIR, "train")
220        for _ in train_data.__iter__():
221            pass
222    error_msg_10 = "failed to find USPS test data file"
223    with pytest.raises(RuntimeError, match=error_msg_10):
224        test_data = ds.USPSDataset(WRONG_DIR, "test")
225        for _ in test_data.__iter__():
226            pass
227
228
229def test_usps_visualize(plot=False):
230    """
231    Visualize USPSDataset results
232    """
233    logger.info("Test USPSDataset visualization")
234
235    train_data = ds.USPSDataset(DATA_DIR, "train", num_samples=3, shuffle=False)
236    num_iter = 0
237    image_list, label_list = [], []
238    for item in train_data.create_dict_iterator(num_epochs=1, output_numpy=True):
239        image = item["image"]
240        label = item["label"]
241        image_list.append(image)
242        label_list.append("label {}".format(label))
243        assert isinstance(image, np.ndarray)
244        assert image.shape == (16, 16, 1)
245        assert image.dtype == np.uint8
246        assert label.dtype == np.uint32
247        num_iter += 1
248    assert num_iter == 3
249    if plot:
250        visualize_dataset(image_list, label_list)
251
252    test_data = ds.USPSDataset(DATA_DIR, "test", num_samples=3, shuffle=False)
253    num_iter = 0
254    image_list, label_list = [], []
255    for item in test_data.create_dict_iterator(num_epochs=1, output_numpy=True):
256        image = item["image"]
257        label = item["label"]
258        image_list.append(image)
259        label_list.append("label {}".format(label))
260        assert isinstance(image, np.ndarray)
261        assert image.shape == (16, 16, 1)
262        assert image.dtype == np.uint8
263        assert label.dtype == np.uint32
264        num_iter += 1
265    assert num_iter == 3
266    if plot:
267        visualize_dataset(image_list, label_list)
268
269
270def test_usps_usage():
271    """
272    Validate USPSDataset image readings
273    """
274    logger.info("Test USPSDataset usage flag")
275
276    def test_config(usage, path=None):
277        path = DATA_DIR if path is None else path
278        try:
279            data = ds.USPSDataset(path, usage=usage, shuffle=False)
280            num_rows = 0
281            for _ in data.create_dict_iterator(num_epochs=1, output_numpy=True):
282                num_rows += 1
283        except (ValueError, TypeError, RuntimeError) as e:
284            return str(e)
285        return num_rows
286
287    assert test_config("train") == 3
288    assert test_config("test") == 3
289
290    assert "usage is not within the valid set of ['train', 'test', 'all']" in test_config("invalid")
291    assert "Argument usage with value ['list'] is not of type [<class 'str'>]" in test_config(["list"])
292
293    # change this directory to the folder that contains all USPS files
294    all_files_path = None
295    # the following tests on the entire datasets
296    if all_files_path is not None:
297        assert test_config("train", all_files_path) == 3
298        assert test_config("test", all_files_path) == 3
299        assert ds.USPSDataset(all_files_path, usage="train").get_dataset_size() == 3
300        assert ds.USPSDataset(all_files_path, usage="test").get_dataset_size() == 3
301
302
303if __name__ == '__main__':
304    test_usps_content_check()
305    test_usps_basic()
306    test_usps_exception()
307    test_usps_visualize(plot=True)
308    test_usps_usage()
309