1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15""" 16Testing TimeMasking op in DE. 17""" 18 19import numpy as np 20import pytest 21 22import mindspore.dataset as ds 23import mindspore.dataset.audio.transforms as audio 24from mindspore import log as logger 25 26CHANNEL = 2 27FREQ = 20 28TIME = 30 29 30 31def gen(shape): 32 np.random.seed(0) 33 data = np.random.random(shape) 34 yield (np.array(data, dtype=np.float32),) 35 36 37def count_unequal_element(data_expected, data_me, rtol, atol): 38 """ Precision calculation func """ 39 assert data_expected.shape == data_me.shape 40 total_count = len(data_expected.flatten()) 41 error = np.abs(data_expected - data_me) 42 greater = np.greater(error, atol + np.abs(data_expected) * rtol) 43 loss_count = np.count_nonzero(greater) 44 assert (loss_count / total_count) < rtol, "\ndata_expected_std:{0}\ndata_me_error:{1}\nloss:{2}".format( 45 data_expected[greater], data_me[greater], error[greater]) 46 47 48def allclose_nparray(data_expected, data_me, rtol, atol, equal_nan=True): 49 """ Precision calculation formula """ 50 if np.any(np.isnan(data_expected)): 51 assert np.allclose(data_me, data_expected, rtol, atol, equal_nan=equal_nan) 52 elif not np.allclose(data_me, data_expected, rtol, atol, equal_nan=equal_nan): 53 count_unequal_element(data_expected, data_me, rtol, atol) 54 55 56def test_func_time_masking_eager_random_input(): 57 """ mindspore eager mode normal testcase:time_masking op""" 58 logger.info("test time_masking op") 59 spectrogram = next(gen((CHANNEL, FREQ, TIME)))[0] 60 out_put = audio.TimeMasking(False, 3, 1, 10)(spectrogram) 61 assert out_put.shape == (CHANNEL, FREQ, TIME) 62 63 64def test_func_time_masking_eager_precision(): 65 """ mindspore eager mode normal testcase:time_masking op""" 66 logger.info("test time_masking op") 67 spectrogram = np.array([[[0.17274511, 0.85174704, 0.07162686, -0.45436913], 68 [-1.045921, -1.8204843, 0.62333095, -0.09532598], 69 [1.8175547, -0.25779432, -0.58152324, -0.00221091]], 70 [[-1.205032, 0.18922766, -0.5277673, -1.3090396], 71 [1.8914849, -0.97001046, -0.23726775, 0.00525892], 72 [-1.0271876, 0.33526883, 1.7413973, 0.12313101]]]).astype(np.float32) 73 out_ms = audio.TimeMasking(False, 2, 0, 0)(spectrogram) 74 out_benchmark = np.array([[[0., 0., 0.07162686, -0.45436913], 75 [0., 0., 0.62333095, -0.09532598], 76 [0., 0., -0.58152324, -0.00221091]], 77 [[0., 0., -0.5277673, -1.3090396], 78 [0., 0., -0.23726775, 0.00525892], 79 [0., 0., 1.7413973, 0.12313101]]]).astype(np.float32) 80 allclose_nparray(out_ms, out_benchmark, 0.0001, 0.0001) 81 82 83def test_func_time_masking_pipeline(): 84 """ mindspore pipeline mode normal testcase:time_masking op""" 85 logger.info("test time_masking op, pipeline") 86 87 generator = gen([CHANNEL, FREQ, TIME]) 88 data1 = ds.GeneratorDataset(source=generator, column_names=["multi_dimensional_data"]) 89 90 transforms = [audio.TimeMasking(True, 8)] 91 data1 = data1.map(operations=transforms, input_columns=["multi_dimensional_data"]) 92 93 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): 94 out_put = item["multi_dimensional_data"] 95 assert out_put.shape == (CHANNEL, FREQ, TIME) 96 97 98def test_time_masking_invalid_input(): 99 def test_invalid_param(test_name, iid_masks, time_mask_param, mask_start, error, error_msg): 100 logger.info("Test TimeMasking with wrong params: {0}".format(test_name)) 101 with pytest.raises(error) as error_info: 102 audio.TimeMasking(iid_masks, time_mask_param, mask_start) 103 assert error_msg in str(error_info.value) 104 105 def test_invalid_input(test_name, iid_masks, time_mask_param, mask_start, error, error_msg): 106 logger.info("Test TimeMasking with wrong params: {0}".format(test_name)) 107 with pytest.raises(error) as error_info: 108 spectrogram = next(gen((CHANNEL, FREQ, TIME)))[0] 109 _ = audio.TimeMasking(iid_masks, time_mask_param, mask_start)(spectrogram) 110 assert error_msg in str(error_info.value) 111 112 test_invalid_param("invalid mask_start", True, 2, -10, ValueError, 113 "Input mask_start is not within the required interval of [0, 16777216].") 114 test_invalid_param("invalid mask_param", True, -2, 10, ValueError, 115 "Input mask_param is not within the required interval of [0, 16777216].") 116 test_invalid_param("invalid iid_masks", "True", 2, 10, TypeError, 117 "Argument iid_masks with value True is not of type [<class 'bool'>], but got <class 'str'>.") 118 119 test_invalid_input("invalid mask_start", False, 2, 100, RuntimeError, 120 "MaskAlongAxis: mask_start should be less than the length of chosen dimension.") 121 test_invalid_input("invalid mask_width", False, 200, 2, RuntimeError, 122 "TimeMasking: time_mask_param should be less than or equal to the length of time dimension.") 123 124 125if __name__ == "__main__": 126 test_func_time_masking_eager_random_input() 127 test_func_time_masking_eager_precision() 128 test_func_time_masking_pipeline() 129 test_time_masking_invalid_input() 130