# Copyright 2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """ Test USPS dataset operators """ import os import matplotlib.pyplot as plt import numpy as np import pytest from PIL import Image import mindspore.dataset as ds import mindspore.dataset.vision.c_transforms as vision from mindspore import log as logger DATA_DIR = "../data/dataset/testSBUDataset" WRONG_DIR = "../data/dataset/testMnistData" def load_sbu(path): """ load SBU data """ images = [] captions = [] file1 = os.path.realpath(os.path.join(path, 'SBU_captioned_photo_dataset_urls.txt')) file2 = os.path.realpath(os.path.join(path, 'SBU_captioned_photo_dataset_captions.txt')) for line1, line2 in zip(open(file1), open(file2)): url = line1.rstrip() image = url[23:].replace("/", "_") filename = os.path.join(path, 'sbu_images', image) if os.path.exists(filename): caption = line2.rstrip() images.append(np.asarray(Image.open(filename).convert('RGB')).astype(np.uint8)) captions.append(caption) return images, captions def visualize_dataset(images, captions): """ Helper function to visualize the dataset samples """ num_samples = len(images) for i in range(num_samples): plt.subplot(1, num_samples, i + 1) plt.imshow(images[i].squeeze()) plt.title(captions[i]) plt.show() def test_sbu_content_check(): """ Validate SBUDataset image readings """ logger.info("Test SBUDataset Op with content check") dataset = ds.SBUDataset(DATA_DIR, decode=True, num_samples=50, shuffle=False) images, captions = load_sbu(DATA_DIR) num_iter = 0 # in this example, each dictionary has keys "image" and "caption" for i, data in enumerate(dataset.create_dict_iterator(num_epochs=1, output_numpy=True)): assert data["image"].shape == images[i].shape assert data["caption"].item().decode("utf8") == captions[i] num_iter += 1 assert num_iter == 5 def test_sbu_case(): """ Validate SBUDataset cases """ dataset = ds.SBUDataset(DATA_DIR, decode=True) dataset = dataset.map(operations=[vision.Resize((224, 224))], input_columns=["image"]) repeat_num = 4 dataset = dataset.repeat(repeat_num) batch_size = 2 dataset = dataset.batch(batch_size, drop_remainder=True, pad_info={}) num = 0 for _ in dataset.create_dict_iterator(num_epochs=1, output_numpy=True): num += 1 # 4 x 5 / 2 assert num == 10 dataset = ds.SBUDataset(DATA_DIR, decode=False) dataset = dataset.map(operations=[vision.Decode(rgb=True), vision.Resize((224, 224))], input_columns=["image"]) repeat_num = 4 dataset = dataset.repeat(repeat_num) batch_size = 2 dataset = dataset.batch(batch_size, drop_remainder=True, pad_info={}) num = 0 for _ in dataset.create_dict_iterator(num_epochs=1, output_numpy=True): num += 1 # 4 x 5 / 2 assert num == 10 def test_sbu_basic(): """ Validate SBUDataset """ logger.info("Test SBUDataset Op") # case 1: test loading whole dataset dataset = ds.SBUDataset(DATA_DIR, decode=True) num_iter = 0 for _ in dataset.create_dict_iterator(num_epochs=1, output_numpy=True): num_iter += 1 assert num_iter == 5 # case 2: test num_samples dataset = ds.SBUDataset(DATA_DIR, decode=True, num_samples=5) num_iter = 0 for _ in dataset.create_dict_iterator(num_epochs=1, output_numpy=True): num_iter += 1 assert num_iter == 5 # case 3: test repeat dataset = ds.SBUDataset(DATA_DIR, decode=True, num_samples=5) dataset = dataset.repeat(5) num_iter = 0 for _ in dataset.create_dict_iterator(num_epochs=1, output_numpy=True): num_iter += 1 assert num_iter == 25 # case 4: test batch dataset = ds.SBUDataset(DATA_DIR, decode=True, num_samples=5) assert dataset.get_dataset_size() == 5 assert dataset.get_batch_size() == 1 num_iter = 0 for _ in dataset.create_dict_iterator(num_epochs=1, output_numpy=True): num_iter += 1 assert num_iter == 5 # case 5: test get_class_indexing dataset = ds.SBUDataset(DATA_DIR, decode=True, num_samples=5) assert dataset.get_class_indexing() == {} # case 6: test get_col_names dataset = ds.SBUDataset(DATA_DIR, decode=True, num_samples=5) assert dataset.get_col_names() == ["image", "caption"] def test_sbu_sequential_sampler(): """ Test SBUDataset with SequentialSampler """ logger.info("Test SBUDataset Op with SequentialSampler") num_samples = 5 sampler = ds.SequentialSampler(num_samples=num_samples) dataset_1 = ds.SBUDataset(DATA_DIR, decode=True, sampler=sampler) dataset_2 = ds.SBUDataset(DATA_DIR, decode=True, shuffle=False, num_samples=num_samples) num_iter = 0 for item1, item2 in zip(dataset_1.create_dict_iterator(num_epochs=1, output_numpy=True), dataset_2.create_dict_iterator(num_epochs=1, output_numpy=True)): np.testing.assert_array_equal(item1["caption"], item2["caption"]) num_iter += 1 assert num_iter == num_samples def test_sbu_exception(): """ Test error cases for SBUDataset """ logger.info("Test error cases for SBUDataset") error_msg_1 = "sampler and shuffle cannot be specified at the same time" with pytest.raises(RuntimeError, match=error_msg_1): ds.SBUDataset(DATA_DIR, decode=True, shuffle=False, sampler=ds.SequentialSampler()) error_msg_2 = "sampler and sharding cannot be specified at the same time" with pytest.raises(RuntimeError, match=error_msg_2): ds.SBUDataset(DATA_DIR, decode=True, sampler=ds.SequentialSampler(), num_shards=2, shard_id=0) error_msg_3 = "num_shards is specified and currently requires shard_id as well" with pytest.raises(RuntimeError, match=error_msg_3): ds.SBUDataset(DATA_DIR, decode=True, num_shards=10) error_msg_4 = "shard_id is specified but num_shards is not" with pytest.raises(RuntimeError, match=error_msg_4): ds.SBUDataset(DATA_DIR, decode=True, shard_id=0) error_msg_5 = "Input shard_id is not within the required interval" with pytest.raises(ValueError, match=error_msg_5): ds.SBUDataset(DATA_DIR, decode=True, num_shards=5, shard_id=-1) with pytest.raises(ValueError, match=error_msg_5): ds.SBUDataset(DATA_DIR, decode=True, num_shards=5, shard_id=5) with pytest.raises(ValueError, match=error_msg_5): ds.SBUDataset(DATA_DIR, decode=True, num_shards=2, shard_id=5) error_msg_6 = "num_parallel_workers exceeds" with pytest.raises(ValueError, match=error_msg_6): ds.SBUDataset(DATA_DIR, decode=True, shuffle=False, num_parallel_workers=0) with pytest.raises(ValueError, match=error_msg_6): ds.SBUDataset(DATA_DIR, decode=True, shuffle=False, num_parallel_workers=256) with pytest.raises(ValueError, match=error_msg_6): ds.SBUDataset(DATA_DIR, decode=True, shuffle=False, num_parallel_workers=-2) error_msg_7 = "Argument shard_id" with pytest.raises(TypeError, match=error_msg_7): ds.SBUDataset(DATA_DIR, decode=True, num_shards=2, shard_id="0") def exception_func(item): raise Exception("Error occur!") error_msg_8 = "The corresponding data files" with pytest.raises(RuntimeError, match=error_msg_8): dataset = ds.SBUDataset(DATA_DIR, decode=True) dataset = dataset.map(operations=exception_func, input_columns=["image"], num_parallel_workers=1) for _ in dataset.__iter__(): pass with pytest.raises(RuntimeError, match=error_msg_8): dataset = ds.SBUDataset(DATA_DIR, decode=True) dataset = dataset.map(operations=vision.Decode(), input_columns=["image"], num_parallel_workers=1) for _ in dataset.__iter__(): pass error_msg_9 = "does not exist or permission denied" with pytest.raises(ValueError, match=error_msg_9): dataset = ds.SBUDataset(WRONG_DIR, decode=True) for _ in dataset.__iter__(): pass error_msg_10 = "Argument decode with value" with pytest.raises(TypeError, match=error_msg_10): dataset = ds.SBUDataset(DATA_DIR, decode="not_bool") for _ in dataset.__iter__(): pass def test_sbu_visualize(plot=False): """ Visualize SBUDataset results """ logger.info("Test SBUDataset visualization") dataset = ds.SBUDataset(DATA_DIR, decode=True, num_samples=10, shuffle=False) num_iter = 0 image_list, caption_list = [], [] for item in dataset.create_dict_iterator(num_epochs=1, output_numpy=True): image = item["image"] caption = item["caption"].item().decode("utf8") image_list.append(image) caption_list.append("caption {}".format(caption)) assert isinstance(image, np.ndarray) assert image.dtype == np.uint8 assert isinstance(caption, str) num_iter += 1 assert num_iter == 5 if plot: visualize_dataset(image_list, caption_list) def test_sbu_decode(): """ Validate SBUDataset image readings """ logger.info("Test SBUDataset decode flag") sampler = ds.SequentialSampler(num_samples=50) dataset = ds.SBUDataset(dataset_dir=DATA_DIR, decode=False, sampler=sampler) dataset_1 = dataset.map(operations=[vision.Decode(rgb=True)], input_columns=["image"]) dataset_2 = ds.SBUDataset(dataset_dir=DATA_DIR, decode=True, sampler=sampler) num_iter = 0 for item1, item2 in zip(dataset_1.create_dict_iterator(num_epochs=1, output_numpy=True), dataset_2.create_dict_iterator(num_epochs=1, output_numpy=True)): np.testing.assert_array_equal(item1["caption"], item2["caption"]) num_iter += 1 assert num_iter == 5 if __name__ == '__main__': test_sbu_content_check() test_sbu_basic() test_sbu_case() test_sbu_sequential_sampler() test_sbu_exception() test_sbu_visualize(plot=True) test_sbu_decode()