1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests for reconstruction_ops.""" 16 17from absl.testing import parameterized 18import numpy as np 19 20from tensorflow.python.eager import context 21from tensorflow.python.framework import constant_op 22from tensorflow.python.framework import dtypes 23from tensorflow.python.framework import test_util 24from tensorflow.python.ops import array_ops 25from tensorflow.python.ops import gradient_checker_v2 26from tensorflow.python.ops import gradients_impl 27from tensorflow.python.ops import math_ops 28from tensorflow.python.ops.signal import reconstruction_ops 29from tensorflow.python.platform import test 30 31 32@test_util.run_all_in_graph_and_eager_modes 33class ReconstructionOpsTest(test.TestCase, parameterized.TestCase): 34 35 def __init__(self, *args, **kwargs): 36 super(ReconstructionOpsTest, self).__init__(*args, **kwargs) 37 self.batch_size = 3 38 self.frames = 3 39 self.samples = 5 40 41 self.bases = np.array(range(2, 5)) 42 exponents = np.array(range(self.frames * self.samples)) 43 powers = np.power(self.bases[:, np.newaxis], exponents[np.newaxis, :]) 44 45 self.powers = np.reshape(powers, [self.batch_size, self.frames, 46 self.samples]) 47 self.frame_hop = 2 48 49 # Hand computed example using powers of unique numbers: this is easily 50 # verified. 51 self.expected_string = ["1", "10", "100100", "1001000", "10010010000", 52 "100100000000", "1001000000000", "10000000000000", 53 "100000000000000"] 54 55 def test_all_ones(self): 56 signal = array_ops.ones([3, 5]) 57 reconstruction = reconstruction_ops.overlap_and_add(signal, 2) 58 self.assertEqual(reconstruction.shape.as_list(), [9]) 59 expected_output = np.array([1, 1, 2, 2, 3, 2, 2, 1, 1]) 60 self.assertAllClose(reconstruction, expected_output) 61 62 def test_unknown_shapes(self): 63 # This test uses placeholders and does not work in Eager mode. 64 if context.executing_eagerly(): 65 return 66 signal = array_ops.placeholder_with_default( 67 np.ones((4, 3, 5)).astype(np.int32), shape=[None, None, None]) 68 frame_step = array_ops.placeholder_with_default(2, shape=[]) 69 reconstruction = reconstruction_ops.overlap_and_add(signal, frame_step) 70 self.assertEqual(reconstruction.shape.as_list(), [None, None]) 71 expected_output = np.array([[1, 1, 2, 2, 3, 2, 2, 1, 1]] * 4) 72 self.assertAllClose(reconstruction, expected_output) 73 74 def test_unknown_rank(self): 75 # This test uses placeholders and does not work in eager mode. 76 if context.executing_eagerly(): 77 return 78 signal = array_ops.placeholder_with_default( 79 np.ones((4, 3, 5)).astype(np.int32), shape=None) 80 frame_step = array_ops.placeholder_with_default(2, shape=[]) 81 reconstruction = reconstruction_ops.overlap_and_add(signal, frame_step) 82 83 self.assertEqual(reconstruction.shape, None) 84 expected_output = np.array([[1, 1, 2, 2, 3, 2, 2, 1, 1]] * 4) 85 self.assertAllClose(reconstruction, expected_output) 86 87 def test_fast_path(self): 88 # This test uses tensor names and does not work in eager mode. 89 if context.executing_eagerly(): 90 return 91 signal = array_ops.ones([3, 5]) 92 frame_step = 5 93 reconstruction = reconstruction_ops.overlap_and_add(signal, frame_step) 94 self.assertEqual(reconstruction.name, "overlap_and_add/fast_path:0") 95 expected_output = np.ones([15]) 96 self.assertAllClose(reconstruction, expected_output) 97 98 @parameterized.parameters( 99 # All hop lengths on a frame length of 2. 100 (2, [1, 5, 9, 6], 1), 101 (2, [1, 2, 3, 4, 5, 6], 2), 102 103 # All hop lengths on a frame length of 3. 104 (3, [1, 6, 15, 14, 9], 1), 105 (3, [1, 2, 7, 5, 13, 8, 9], 2), 106 (3, [1, 2, 3, 4, 5, 6, 7, 8, 9], 3), 107 108 # All hop lengths on a frame length of 4. 109 (4, [1, 7, 18, 21, 19, 12], 1), 110 (4, [1, 2, 8, 10, 16, 18, 11, 12], 2), 111 (4, [1, 2, 3, 9, 6, 7, 17, 10, 11, 12], 3), 112 (4, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], 4)) 113 def test_simple(self, frame_length, expected, frame_hop): 114 def make_input(frame_length, num_frames=3): 115 """Generate a tensor of num_frames frames of frame_length.""" 116 return np.reshape(np.arange(1, num_frames * frame_length + 1), 117 (-1, frame_length)) 118 signal = make_input(frame_length) 119 reconstruction = reconstruction_ops.overlap_and_add( 120 np.array(signal), frame_hop) 121 expected_output = np.array(expected) 122 self.assertAllClose(reconstruction, expected_output) 123 124 def test_powers(self): 125 signal = constant_op.constant(np.squeeze(self.powers[0, :, :]), 126 dtype=dtypes.int64) 127 reconstruction = reconstruction_ops.overlap_and_add(signal, self.frame_hop) 128 129 output = self.evaluate(reconstruction) 130 string_output = [np.base_repr(x, self.bases[0]) for x in output] 131 self.assertEqual(string_output, self.expected_string) 132 133 def test_batch(self): 134 signal = constant_op.constant(self.powers, dtype=dtypes.int64) 135 reconstruction = reconstruction_ops.overlap_and_add(signal, self.frame_hop) 136 137 output = self.evaluate(reconstruction) 138 139 accumulator = True 140 for i in range(self.batch_size): 141 string_output = [np.base_repr(x, self.bases[i]) for x in output[i, :]] 142 accumulator = accumulator and (string_output == self.expected_string) 143 144 self.assertTrue(accumulator) 145 146 def test_one_element_batch(self): 147 input_matrix = np.squeeze(self.powers[0, :, :]) 148 input_matrix = input_matrix[np.newaxis, :, :].astype(float) 149 signal = constant_op.constant(input_matrix, dtype=dtypes.float32) 150 reconstruction = reconstruction_ops.overlap_and_add(signal, self.frame_hop) 151 152 output = self.evaluate(reconstruction) 153 154 string_output = [np.base_repr(int(x), self.bases[0]) for x in 155 np.squeeze(output)] 156 157 self.assertEqual(output.shape, (1, 9)) 158 self.assertEqual(string_output, self.expected_string) 159 160 @parameterized.parameters( 161 ((1, 128), 1), 162 ((5, 35), 17), 163 ((10, 128), 128), 164 ((2, 10, 128), 127), 165 ((2, 2, 10, 128), 126), 166 ((2, 2, 2, 10, 128), 125)) 167 def test_gradient(self, shape, frame_hop): 168 # TODO(rjryan): Eager gradient tests. 169 if context.executing_eagerly(): 170 return 171 signal = array_ops.zeros(shape) 172 reconstruction = reconstruction_ops.overlap_and_add(signal, frame_hop) 173 loss = math_ops.reduce_sum(reconstruction) 174 # Increasing any sample in the input frames by one will increase the sum 175 # of all the samples in the reconstruction by 1, so the gradient should 176 # be all ones, no matter the shape or hop. 177 gradient = self.evaluate(gradients_impl.gradients([loss], [signal])[0]) 178 self.assertTrue((gradient == 1.0).all()) 179 180 def test_gradient_batch(self): 181 # TODO(rjryan): Eager gradient tests. 182 if context.executing_eagerly(): 183 return 184 signal = array_ops.zeros((2, 10, 10)) 185 frame_hop = 10 186 reconstruction = reconstruction_ops.overlap_and_add(signal, frame_hop) 187 188 # Multiply the first batch-item's reconstruction by zeros. This will block 189 # gradient from flowing into the first batch item from the loss. Multiply 190 # the second batch item by the integers from 0 to 99. Since there is zero 191 # overlap, the gradient for this batch item will be 0-99 shaped as (10, 192 # 10). 193 reconstruction *= array_ops.stack( 194 [array_ops.zeros((100,)), 195 math_ops.cast(math_ops.range(100), dtypes.float32)]) 196 loss = math_ops.reduce_sum(reconstruction) 197 198 # Verify that only the second batch item receives gradient. 199 gradient = self.evaluate(gradients_impl.gradients([loss], [signal])[0]) 200 expected_gradient = np.stack([ 201 np.zeros((10, 10)), 202 np.reshape(np.arange(100).astype(np.float32), (10, 10))]) 203 self.assertAllEqual(expected_gradient, gradient) 204 205 def test_gradient_numerical(self): 206 shape = (2, 10, 10) 207 framed_signal = array_ops.zeros(shape) 208 frame_hop = 10 209 def f(signal): 210 return reconstruction_ops.overlap_and_add(signal, frame_hop) 211 ((jacob_t,), (jacob_n,)) = gradient_checker_v2.compute_gradient( 212 f, [framed_signal]) 213 self.assertAllClose(jacob_t, jacob_n) 214 215 216if __name__ == "__main__": 217 test.main() 218