1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests for atrous convolution functionality in tensorflow.ops.nn.""" 16 17import contextlib 18 19import numpy as np 20 21from tensorflow.python.eager import context 22from tensorflow.python.framework import constant_op 23from tensorflow.python.framework import dtypes 24from tensorflow.python.framework import test_util 25from tensorflow.python.ops import array_ops 26from tensorflow.python.ops import gradient_checker 27from tensorflow.python.ops import nn_ops 28import tensorflow.python.ops.nn_grad # pylint: disable=unused-import 29from tensorflow.python.platform import test 30 31 32def upsample_filters(filters, rate): 33 """Upsamples the filters by a factor of rate along the spatial dimensions. 34 35 Args: 36 filters: spatial_shape + [in_channels, out_channels] 37 Original filters. 38 rate: A list of len(spatial_shape) positive ints, specifying the 39 upsampling rate. 40 41 Returns: 42 filters_up: output_spatial_shape + [in_channels, out_channels]. 43 Upsampled filters with 44 output_spatial_shape[i] = (spatial_shape[i] - 1) * rate[i] + 1 45 containing (rate[i] - 1) zeros between consecutive filter values along 46 spatial dimension i. 47 """ 48 num_spatial_dims = len(rate) 49 spatial_shape = np.array(filters.shape[:num_spatial_dims]) 50 output_spatial_shape = (spatial_shape - 1) * rate + 1 51 output = np.zeros( 52 tuple(output_spatial_shape) + tuple(filters.shape[-2:]), filters.dtype) 53 output[tuple(np.s_[::rate[i]] for i in range(num_spatial_dims))] = filters 54 return output 55 56 57class AtrousConvolutionTest(test.TestCase): 58 59 @contextlib.contextmanager 60 def _delay_checks(self): 61 """Context manager for combining checks depending on tensor evaluations. 62 63 Each call to Session.run has some overhead, and this overhead can easily 64 account for the majority of the time spent in tests that call Session.run 65 (or Tensor.eval) many times. 66 67 This context manager provides a mechanism for registering callback functions 68 and associated tensors. When the context is exited, all of the tensors 69 associated with all of the registrations are evaluated with a single call to 70 Session.run, and then each registered callback function is called with the 71 values of its associated tensors. 72 73 Yields: 74 A function `add_check(check, *args, **kwargs)` where `check` is the 75 callback function to be invoked, and `*args` and `**kwargs` specify the 76 associated Tensors. When in EAGER mode, check is executed in add_check, 77 otherwise, it's delayed after the context. 78 """ 79 checks = [] 80 81 def add_check(check, *args, **kwargs): 82 if context.executing_eagerly(): 83 args_val, kwargs_val = self.evaluate([args, kwargs]) 84 check(*args_val, **kwargs_val) 85 else: 86 checks.append((check, args, kwargs)) 87 88 yield add_check 89 if not context.executing_eagerly(): 90 all_values = self.evaluate([[args, kwargs] for _, args, kwargs in checks]) 91 for (check, _, _), (args, kwargs) in zip(checks, all_values): 92 check(*args, **kwargs) 93 94 def _test_atrous_convolution(self, add_check, input_shape, filter_shape, 95 dilation_rate, **kwargs): 96 filters = np.arange( 97 np.prod(filter_shape), dtype=np.float32).reshape(filter_shape) 98 filters_upsampled = upsample_filters(filters, dilation_rate) 99 x = np.arange(np.prod(input_shape), dtype=np.float32).reshape(input_shape) 100 y1 = nn_ops.convolution( 101 input=x, filter=filters, dilation_rate=dilation_rate, **kwargs) 102 y2 = nn_ops.convolution(input=x, filter=filters_upsampled, **kwargs) 103 104 def check(y1_eval, y2_eval): 105 self.assertAllClose(y1_eval, y2_eval, rtol=1e-2, atol=1e-2) 106 107 add_check(check, y1, y2) 108 109 @test_util.run_v1_only("b/120545219") 110 def test_unknown_spatial_dims_for_channel_last_format(self): 111 x = array_ops.placeholder(dtypes.float32, [1, None, None, 10]) 112 w = array_ops.zeros([3, 3, 10, 20]) 113 y = nn_ops.convolution( 114 x, w, "VALID", dilation_rate=[2, 2], data_format="NHWC") 115 self.assertEqual(y.shape.as_list(), [1, None, None, 20]) 116 117 @test_util.run_v1_only("b/120545219") 118 def test_unknown_spatial_dims_for_channel_first_format(self): 119 x = array_ops.placeholder(dtypes.float32, [1, 10, None, None]) 120 w = array_ops.zeros([3, 3, 10, 20]) 121 y = nn_ops.convolution( 122 x, w, "VALID", dilation_rate=[2, 2], data_format="NCHW") 123 self.assertEqual(y.shape.as_list(), [1, 20, None, None]) 124 125 @test_util.run_in_graph_and_eager_modes 126 def testAtrousConvolution2D(self): 127 with self._delay_checks() as add_check: 128 for padding in ["SAME", "VALID"]: 129 for height, width in [[9, 9], [9, 10]]: 130 for kernel_height, kernel_width in [[1, 1], [2, 2], [2, 3]]: 131 for dilation_rate in [[1, 1], [3, 2], [2, 1]]: 132 self._test_atrous_convolution( 133 add_check=add_check, 134 input_shape=[2, height, width, 2], 135 filter_shape=[kernel_height, kernel_width, 2, 2], 136 padding=padding, 137 dilation_rate=dilation_rate, 138 ) 139 140 @test_util.run_in_graph_and_eager_modes 141 def testAtrousConvolution3D(self): 142 with self._delay_checks() as add_check: 143 for padding in ["SAME", "VALID"]: 144 for depth, height, width in [[9, 9, 10], [9, 10, 9]]: 145 for kernel_depth, kernel_height, kernel_width in [[3, 3, 146 3], [3, 2, 2], 147 [2, 1, 3]]: 148 for dilation_rate in [[1, 1, 1], [3, 3, 3], [3, 2, 3], [3, 1, 2]]: 149 self._test_atrous_convolution( 150 add_check=add_check, 151 input_shape=[2, depth, height, width, 2], 152 filter_shape=[ 153 kernel_depth, kernel_height, kernel_width, 2, 2 154 ], 155 padding=padding, 156 dilation_rate=dilation_rate, 157 ) 158 159 @test_util.run_in_graph_and_eager_modes 160 def testAtrousConvolution1D(self): 161 with self._delay_checks() as add_check: 162 for padding in ["SAME", "VALID"]: 163 for width in [9, 10]: 164 for kernel_width in range(1, 4): 165 for rate in range(1, 4): 166 self._test_atrous_convolution( 167 add_check=add_check, 168 input_shape=[2, width, 2], 169 filter_shape=[kernel_width, 2, 2], 170 padding=padding, 171 dilation_rate=[rate], 172 ) 173 174 @test_util.run_in_graph_and_eager_modes 175 def testAtrousConvolutionNC(self): 176 if test.is_gpu_available(cuda_only=True): 177 # "NCW" and "NCHW" formats are currently supported only on CUDA. 178 with test_util.device(use_gpu=True): 179 with self._delay_checks() as add_check: 180 for padding in ["SAME", "VALID"]: 181 self._test_atrous_convolution( 182 add_check=add_check, 183 input_shape=[2, 2, 9], 184 padding=padding, 185 filter_shape=[3, 2, 2], 186 dilation_rate=[2], 187 data_format="NCW", 188 ) 189 self._test_atrous_convolution( 190 add_check=add_check, 191 input_shape=[2, 2, 9, 5], 192 padding=padding, 193 filter_shape=[3, 3, 2, 2], 194 dilation_rate=[2, 1], 195 data_format="NCHW", 196 ) 197 198 @test_util.run_in_graph_and_eager_modes 199 def testAtrousSequence(self): 200 """Tests optimization of sequence of atrous convolutions. 201 202 See the documentation of with_space_to_batch. 203 """ 204 with self._delay_checks() as add_check: 205 for padding in ["SAME", "VALID"]: 206 for height in range(15, 17): 207 for width in range(15, 17): 208 x_shape = [3, height, width, 2] 209 x = np.random.random_sample(x_shape).astype(np.float32) 210 211 kernel_sizes = [1, 3] if padding == "SAME" else range(1, 3) 212 for kernel in kernel_sizes: 213 f_shape = [kernel, kernel, 2, 2] 214 f1 = 1e-2 * np.random.random_sample(f_shape).astype(np.float32) 215 f2 = 1e-2 * np.random.random_sample(f_shape).astype(np.float32) 216 217 def combined_op(converted_input, num_spatial_dims, padding_arg): # pylint: disable=unused-argument 218 # pylint: disable=cell-var-from-loop 219 result = nn_ops.convolution( 220 input=converted_input, filter=f1, padding=padding) 221 result = nn_ops.convolution( 222 input=result, filter=f2, padding=padding) 223 # pylint: enable=cell-var-from-loop 224 return result 225 226 for rate_height in range(2, 4): 227 for rate_width in range(2, 4): 228 dilation_rate = [rate_height, rate_width] 229 y1 = nn_ops.convolution( 230 input=x, 231 filter=f1, 232 padding=padding, 233 dilation_rate=dilation_rate) 234 y1 = nn_ops.convolution( 235 input=y1, 236 filter=f2, 237 padding=padding, 238 dilation_rate=dilation_rate) 239 y2 = nn_ops.with_space_to_batch( 240 input=x, 241 dilation_rate=dilation_rate, 242 op=combined_op, 243 padding="VALID") 244 245 def check(y1_eval, y2_eval): 246 self.assertAllClose(y1_eval, y2_eval, rtol=1e-2, atol=1e-2) 247 248 add_check(check, y1, y2) 249 250 def _test_gradient(self, x_shape, f_shape, dilation_rate, padding): 251 x_val = np.random.random_sample(x_shape).astype(np.float32) 252 f_val = np.random.random_sample(f_shape).astype(np.float32) 253 x = constant_op.constant(x_val, name="x", dtype=dtypes.float32) 254 f = constant_op.constant(f_val, name="f", dtype=dtypes.float32) 255 output = nn_ops.convolution( 256 input=x, filter=f, dilation_rate=dilation_rate, padding=padding) 257 y_shape = output.get_shape().as_list() 258 err = gradient_checker.compute_gradient_error([x, f], [x_shape, f_shape], 259 output, y_shape) 260 err_tolerance = 1e-2 261 self.assertLess(err, err_tolerance) 262 263 @test_util.run_v1_only("b/120545219") 264 def testGradient(self): 265 with self.cached_session(): 266 for padding in ["SAME", "VALID"]: 267 for rate_width in range(1, 3): 268 for rate_height in range(1, 3): 269 self._test_gradient( 270 x_shape=[2, 5, 6, 2], 271 f_shape=[3, 3, 2, 2], 272 dilation_rate=[rate_height, rate_width], 273 padding=padding) 274 275 276if __name__ == "__main__": 277 test.main() 278