1# Copyright 2021 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15""" 16Testing TimeStretch op in DE 17""" 18import numpy as np 19import pytest 20 21import mindspore.dataset as ds 22import mindspore.dataset.audio.transforms as c_audio 23from mindspore import log as logger 24 25CHANNEL_NUM = 2 26FREQ = 1025 27FRAME_NUM = 300 28COMPLEX = 2 29 30 31def gen(shape): 32 np.random.seed(0) 33 data = np.random.random(shape) 34 yield (np.array(data, dtype=np.float32),) 35 36 37def count_unequal_element(data_expected, data_me, rtol, atol): 38 assert data_expected.shape == data_me.shape 39 total_count = len(data_expected.flatten()) 40 error = np.abs(data_expected - data_me) 41 greater = np.greater(error, atol + np.abs(data_expected) * rtol) 42 loss_count = np.count_nonzero(greater) 43 assert (loss_count / total_count) < rtol, "\ndata_expected_std:{0}\ndata_me_error:{1}\nloss:{2}".format( 44 data_expected[greater], data_me[greater], error[greater]) 45 46 47def allclose_nparray(data_expected, data_me, rtol, atol, equal_nan=True): 48 if np.any(np.isnan(data_expected)): 49 assert np.allclose(data_me, data_expected, rtol, atol, equal_nan=equal_nan) 50 elif not np.allclose(data_me, data_expected, rtol, atol, equal_nan=equal_nan): 51 count_unequal_element(data_expected, data_me, rtol, atol) 52 53 54def test_time_stretch_pipeline(): 55 """ 56 Test TimeStretch op. Pipeline. 57 """ 58 logger.info("test TimeStretch op") 59 generator = gen([CHANNEL_NUM, FREQ, FRAME_NUM, COMPLEX]) 60 data1 = ds.GeneratorDataset(source=generator, column_names=["multi_dimensional_data"]) 61 62 transforms = [c_audio.TimeStretch(512, FREQ, 1.3)] 63 data1 = data1.map(operations=transforms, input_columns=["multi_dimensional_data"]) 64 65 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): 66 out_put = item["multi_dimensional_data"] 67 assert out_put.shape == (CHANNEL_NUM, FREQ, np.ceil(FRAME_NUM / 1.3), COMPLEX) 68 69 70def test_time_stretch_pipeline_invalid_param(): 71 """ 72 Test TimeStretch op. Set invalid param. Pipeline. 73 """ 74 logger.info("test TimeStretch op with invalid values") 75 generator = gen([CHANNEL_NUM, FREQ, FRAME_NUM, COMPLEX]) 76 data1 = ds.GeneratorDataset(source=generator, column_names=["multi_dimensional_data"]) 77 78 with pytest.raises(ValueError, match=r"Input fixed_rate is not within the required interval of \(0, 16777216\]."): 79 transforms = [c_audio.TimeStretch(512, FREQ, -1.3)] 80 data1 = data1.map(operations=transforms, input_columns=["multi_dimensional_data"]) 81 82 for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True): 83 out_put = item["multi_dimensional_data"] 84 assert out_put.shape == (CHANNEL_NUM, FREQ, np.ceil(FRAME_NUM / 1.3), COMPLEX) 85 86 87def test_time_stretch_eager(): 88 """ 89 Test TimeStretch op. Set param. Eager. 90 """ 91 logger.info("test TimeStretch op with customized parameter values") 92 spectrogram = next(gen([CHANNEL_NUM, FREQ, FRAME_NUM, COMPLEX]))[0] 93 out_put = c_audio.TimeStretch(512, FREQ, 1.3)(spectrogram) 94 assert out_put.shape == (CHANNEL_NUM, FREQ, np.ceil(FRAME_NUM / 1.3), COMPLEX) 95 96 97def test_percision_time_stretch_eager(): 98 """ 99 Test TimeStretch op. Compare precision. Eager. 100 """ 101 logger.info("test TimeStretch op with default values") 102 spectrogram = np.array([[[[1.0402449369430542, 0.3807601034641266], 103 [-1.120057225227356, -0.12819576263427734], 104 [1.4303032159805298, -0.08839055150747299]], 105 [[1.4198592901229858, 0.6900091767311096], 106 [-1.8593409061431885, 0.16363371908664703], 107 [-2.3349387645721436, -1.4366451501846313]]], 108 [[[-0.7083967328071594, 0.9325454831123352], 109 [-1.9133838415145874, 0.011225821450352669], 110 [1.477278232574463, -1.0551637411117554]], 111 [[-0.6668586134910583, -0.23143270611763], 112 [-2.4390718936920166, 0.17638640105724335], 113 [-0.4795735776424408, 0.1345423310995102]]]]).astype(np.float64) 114 out_expect = np.array([[[[1.0402449369430542, 0.3807601034641266], 115 [-1.302264928817749, -0.1490504890680313]], 116 [[1.4198592901229858, 0.6900091767311096], 117 [-2.382312774658203, 0.2096325159072876]]], 118 [[[-0.7083966732025146, 0.9325454831123352], 119 [-1.8545820713043213, 0.010880803689360619]], 120 [[-0.6668586134910583, -0.23143276572227478], 121 [-1.2737033367156982, 0.09211209416389465]]]]).astype(np.float64) 122 out_ms = c_audio.TimeStretch(64, 2, 1.6)(spectrogram) 123 124 allclose_nparray(out_ms, out_expect, 0.001, 0.001) 125 126 127if __name__ == '__main__': 128 test_time_stretch_pipeline() 129 test_time_stretch_pipeline_invalid_param() 130 test_time_stretch_eager() 131 test_percision_time_stretch_eager() 132