• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 Huawei Technologies Co., Ltd
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""
16Testing TimeStretch op in DE
17"""
18import numpy as np
19import pytest
20
21import mindspore.dataset as ds
22import mindspore.dataset.audio.transforms as c_audio
23from mindspore import log as logger
24
25CHANNEL_NUM = 2
26FREQ = 1025
27FRAME_NUM = 300
28COMPLEX = 2
29
30
31def gen(shape):
32    np.random.seed(0)
33    data = np.random.random(shape)
34    yield (np.array(data, dtype=np.float32),)
35
36
37def count_unequal_element(data_expected, data_me, rtol, atol):
38    assert data_expected.shape == data_me.shape
39    total_count = len(data_expected.flatten())
40    error = np.abs(data_expected - data_me)
41    greater = np.greater(error, atol + np.abs(data_expected) * rtol)
42    loss_count = np.count_nonzero(greater)
43    assert (loss_count / total_count) < rtol, "\ndata_expected_std:{0}\ndata_me_error:{1}\nloss:{2}".format(
44        data_expected[greater], data_me[greater], error[greater])
45
46
47def allclose_nparray(data_expected, data_me, rtol, atol, equal_nan=True):
48    if np.any(np.isnan(data_expected)):
49        assert np.allclose(data_me, data_expected, rtol, atol, equal_nan=equal_nan)
50    elif not np.allclose(data_me, data_expected, rtol, atol, equal_nan=equal_nan):
51        count_unequal_element(data_expected, data_me, rtol, atol)
52
53
54def test_time_stretch_pipeline():
55    """
56    Test TimeStretch op. Pipeline.
57    """
58    logger.info("test TimeStretch op")
59    generator = gen([CHANNEL_NUM, FREQ, FRAME_NUM, COMPLEX])
60    data1 = ds.GeneratorDataset(source=generator, column_names=["multi_dimensional_data"])
61
62    transforms = [c_audio.TimeStretch(512, FREQ, 1.3)]
63    data1 = data1.map(operations=transforms, input_columns=["multi_dimensional_data"])
64
65    for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):
66        out_put = item["multi_dimensional_data"]
67    assert out_put.shape == (CHANNEL_NUM, FREQ, np.ceil(FRAME_NUM / 1.3), COMPLEX)
68
69
70def test_time_stretch_pipeline_invalid_param():
71    """
72    Test TimeStretch op. Set invalid param. Pipeline.
73    """
74    logger.info("test TimeStretch op with invalid values")
75    generator = gen([CHANNEL_NUM, FREQ, FRAME_NUM, COMPLEX])
76    data1 = ds.GeneratorDataset(source=generator, column_names=["multi_dimensional_data"])
77
78    with pytest.raises(ValueError, match=r"Input fixed_rate is not within the required interval of \(0, 16777216\]."):
79        transforms = [c_audio.TimeStretch(512, FREQ, -1.3)]
80        data1 = data1.map(operations=transforms, input_columns=["multi_dimensional_data"])
81
82        for item in data1.create_dict_iterator(num_epochs=1, output_numpy=True):
83            out_put = item["multi_dimensional_data"]
84        assert out_put.shape == (CHANNEL_NUM, FREQ, np.ceil(FRAME_NUM / 1.3), COMPLEX)
85
86
87def test_time_stretch_eager():
88    """
89    Test TimeStretch op. Set param. Eager.
90    """
91    logger.info("test TimeStretch op with customized parameter values")
92    spectrogram = next(gen([CHANNEL_NUM, FREQ, FRAME_NUM, COMPLEX]))[0]
93    out_put = c_audio.TimeStretch(512, FREQ, 1.3)(spectrogram)
94    assert out_put.shape == (CHANNEL_NUM, FREQ, np.ceil(FRAME_NUM / 1.3), COMPLEX)
95
96
97def test_percision_time_stretch_eager():
98    """
99    Test TimeStretch op. Compare precision. Eager.
100    """
101    logger.info("test TimeStretch op with default values")
102    spectrogram = np.array([[[[1.0402449369430542, 0.3807601034641266],
103                              [-1.120057225227356, -0.12819576263427734],
104                              [1.4303032159805298, -0.08839055150747299]],
105                             [[1.4198592901229858, 0.6900091767311096],
106                              [-1.8593409061431885, 0.16363371908664703],
107                              [-2.3349387645721436, -1.4366451501846313]]],
108                            [[[-0.7083967328071594, 0.9325454831123352],
109                              [-1.9133838415145874, 0.011225821450352669],
110                              [1.477278232574463, -1.0551637411117554]],
111                             [[-0.6668586134910583, -0.23143270611763],
112                              [-2.4390718936920166, 0.17638640105724335],
113                              [-0.4795735776424408, 0.1345423310995102]]]]).astype(np.float64)
114    out_expect = np.array([[[[1.0402449369430542, 0.3807601034641266],
115                             [-1.302264928817749, -0.1490504890680313]],
116                            [[1.4198592901229858, 0.6900091767311096],
117                             [-2.382312774658203, 0.2096325159072876]]],
118                           [[[-0.7083966732025146, 0.9325454831123352],
119                             [-1.8545820713043213, 0.010880803689360619]],
120                            [[-0.6668586134910583, -0.23143276572227478],
121                             [-1.2737033367156982, 0.09211209416389465]]]]).astype(np.float64)
122    out_ms = c_audio.TimeStretch(64, 2, 1.6)(spectrogram)
123
124    allclose_nparray(out_ms, out_expect, 0.001, 0.001)
125
126
127if __name__ == '__main__':
128    test_time_stretch_pipeline()
129    test_time_stretch_pipeline_invalid_param()
130    test_time_stretch_eager()
131    test_percision_time_stretch_eager()
132