1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests for unified pooling functionality in tensorflow.ops.nn.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import math 22 23import numpy as np 24 25from tensorflow.python.framework import constant_op 26from tensorflow.python.framework import dtypes 27from tensorflow.python.ops import gradient_checker 28from tensorflow.python.ops import nn_ops 29import tensorflow.python.ops.nn_grad # pylint: disable=unused-import 30from tensorflow.python.platform import test 31 32 33def pool_direct_single_axis( 34 input, # pylint: disable=redefined-builtin 35 axis, 36 window_size, 37 pooling_type, 38 padding, 39 dilation_rate, 40 stride): 41 """Numpy implementation of pooling along a single axis. 42 43 This is intended for testing only, and therefore isn't particularly efficient. 44 45 See pool_direct below for the meaning of the arguments. 46 47 Args: 48 input: numpy array. 49 axis: axis along which to perform pooling. 50 window_size: int >= 1. Size of pooling window within axis. 51 pooling_type: either "MAX" or "AVG". 52 padding: either "SAME" or "VALID". 53 dilation_rate: int >= 1. Dilation factor for window, i.e. stride at which 54 to sample input. 55 stride: int >= 1. Stride at which to generate output. 56 57 Returns: 58 pooling output array of rank N+2. 59 60 Raises: 61 ValueError: if arguments are invalid. 62 """ 63 effective_window_size = (window_size - 1) * dilation_rate + 1 64 input_size = input.shape[axis] 65 if padding == "SAME": 66 output_size = int(math.ceil(input_size / stride)) 67 total_padding_amount = max( 68 0, (output_size - 1) * stride + effective_window_size - input_size) 69 before_padding = total_padding_amount // 2 70 elif padding == "VALID": 71 output_size = int( 72 math.ceil((input_size - effective_window_size + 1) / stride)) 73 before_padding = 0 74 else: 75 raise ValueError("Unsupported padding type: %r" % (padding,)) 76 77 output_shape = input.shape[:axis] + (output_size,) + input.shape[axis + 1:] 78 output = np.zeros(output_shape, input.dtype) 79 initial_dim_selector = tuple(np.s_[:] for _ in range(axis)) 80 if pooling_type == "MAX": 81 pooling_func = np.max 82 elif pooling_type == "AVG": 83 pooling_func = np.mean 84 else: 85 raise ValueError("Unsupported pooling type: %r" % (pooling_type,)) 86 for output_pos in range(output_size): 87 input_start_pos = output_pos * stride - before_padding 88 input_end_pos = min(input_start_pos + effective_window_size, input_size) 89 if input_start_pos < 0: 90 input_start_pos += dilation_rate 91 input_slice = np.s_[input_start_pos:input_end_pos:dilation_rate] 92 93 output[initial_dim_selector + (output_pos,)] = pooling_func( 94 input[initial_dim_selector + (input_slice,)], axis=axis) 95 return output 96 97 98def pool_direct( 99 input, # pylint: disable=redefined-builtin 100 window_shape, 101 pooling_type, 102 padding, # pylint: disable=redefined-builtin 103 dilation_rate, 104 strides, 105 data_format=None): 106 """Numpy implementation of pooling. 107 108 This is intended for testing only, and therefore isn't particularly efficient. 109 110 See tensorflow.nn.pool. 111 112 Args: 113 input: numpy array of rank N+2. 114 window_shape: Sequence of N ints >= 1. 115 pooling_type: either "MAX" or "AVG". 116 padding: either "SAME" or "VALID". 117 dilation_rate: Sequence of N ints >= 1. 118 strides: Sequence of N ints >= 1. 119 data_format: If specified and starts with "NC", indicates that second 120 dimension, rather than the last dimension, specifies the channel. 121 122 Returns: 123 pooling output array of rank N+2. 124 125 Raises: 126 ValueError: if arguments are invalid. 127 """ 128 if data_format is None or not data_format.startswith("NC"): 129 spatial_start_dim = 1 130 else: 131 spatial_start_dim = 2 132 output = input 133 for i in range(len(window_shape)): 134 output = pool_direct_single_axis( 135 input=output, 136 axis=i + spatial_start_dim, 137 window_size=window_shape[i], 138 pooling_type=pooling_type, 139 padding=padding, 140 dilation_rate=dilation_rate[i], 141 stride=strides[i]) 142 return output 143 144 145class PoolingTest(test.TestCase): 146 147 def _test(self, input_shape, **kwargs): 148 # Use negative numbers to make sure there isn't any zero padding getting 149 # used. 150 x = -np.arange( 151 np.prod(input_shape), dtype=np.float32).reshape(input_shape) - 1 152 y1 = pool_direct(input=x, **kwargs) 153 y2 = nn_ops.pool(input=x, **kwargs) 154 self.assertAllClose(y1, y2.eval(), rtol=1e-2, atol=1e-2) 155 156 def testPoolSimple(self): 157 with self.test_session(use_gpu=test.is_gpu_available()): 158 for padding in ["SAME", "VALID"]: 159 for pooling_type in ["MAX", "AVG"]: 160 self._test( 161 input_shape=[1, 1, 10, 1], 162 window_shape=[1, 3], 163 padding=padding, 164 pooling_type=pooling_type, 165 dilation_rate=[1, 1], 166 strides=[1, 2]) 167 168 def testPool1D(self): 169 with self.test_session(use_gpu=test.is_gpu_available()): 170 for padding in ["SAME", "VALID"]: 171 for pooling_type in ["MAX", "AVG"]: 172 for input_shape in [[2, 9, 2], [2, 10, 2]]: 173 for window_shape in [[1], [2], [3]]: 174 if padding != "SAME": 175 for dilation_rate in [[1], [2], [3]]: 176 self._test( 177 input_shape=input_shape, 178 window_shape=window_shape, 179 padding=padding, 180 pooling_type=pooling_type, 181 dilation_rate=dilation_rate, 182 strides=[1]) 183 for strides in [[1], [2], [3]]: 184 if np.any(np.array(strides) > window_shape): 185 continue 186 self._test( 187 input_shape=input_shape, 188 window_shape=window_shape, 189 padding=padding, 190 pooling_type=pooling_type, 191 dilation_rate=[1], 192 strides=strides) 193 194 def testPool2D(self): 195 with self.test_session(use_gpu=test.is_gpu_available()): 196 for padding in ["SAME", "VALID"]: 197 for pooling_type in ["MAX", "AVG"]: 198 for input_shape in [[2, 9, 10, 2], [2, 10, 9, 2]]: 199 for window_shape in [[1, 1], [2, 1], [2, 3]]: 200 if padding != "SAME": 201 for dilation_rate in [[1, 1], [2, 1], [1, 2], [2, 3]]: 202 self._test( 203 input_shape=input_shape, 204 window_shape=window_shape, 205 padding=padding, 206 pooling_type=pooling_type, 207 dilation_rate=dilation_rate, 208 strides=[1, 1]) 209 for strides in [[1, 1], [2, 1], [1, 2], [2, 3]]: 210 if np.any(np.array(strides) > window_shape): 211 continue 212 self._test( 213 input_shape=input_shape, 214 window_shape=window_shape, 215 padding=padding, 216 pooling_type=pooling_type, 217 dilation_rate=[1, 1], 218 strides=strides) 219 220 def testPool3D(self): 221 with self.test_session(use_gpu=test.is_gpu_available()): 222 for padding in ["SAME", "VALID"]: 223 for pooling_type in ["MAX", "AVG"]: 224 for input_shape in [[2, 9, 10, 11, 2], [2, 10, 9, 11, 2]]: 225 for window_shape in [[1, 1, 1], [2, 1, 2], [2, 3, 2]]: 226 if padding != "SAME": 227 for dilation_rate in [[1, 1, 1], [2, 1, 2], [1, 2, 2], 228 [2, 3, 3]]: 229 self._test( 230 input_shape=input_shape, 231 window_shape=window_shape, 232 padding=padding, 233 pooling_type=pooling_type, 234 dilation_rate=dilation_rate, 235 strides=[1, 1, 1]) 236 for strides in [[1, 1, 1], [2, 1, 2], [1, 2, 2], [2, 3, 3]]: 237 if np.any(np.array(strides) > window_shape): 238 continue 239 self._test( 240 input_shape=input_shape, 241 window_shape=window_shape, 242 padding=padding, 243 pooling_type=pooling_type, 244 dilation_rate=[1, 1, 1], 245 strides=strides) 246 247 def testPoolNC(self): 248 if test.is_gpu_available(cuda_only=True): 249 # "NC*" format is currently only supported on CUDA. 250 with self.test_session(use_gpu=True): 251 for padding in ["SAME", "VALID"]: 252 self._test( 253 input_shape=[2, 2, 9], 254 window_shape=[2], 255 padding=padding, 256 pooling_type="MAX", 257 strides=[1], 258 dilation_rate=[1], 259 data_format="NCW") 260 self._test( 261 input_shape=[2, 2, 9], 262 window_shape=[2], 263 padding=padding, 264 pooling_type="MAX", 265 strides=[2], 266 dilation_rate=[1], 267 data_format="NCW") 268 self._test( 269 input_shape=[2, 2, 7, 9], 270 window_shape=[2, 2], 271 padding=padding, 272 pooling_type="MAX", 273 strides=[1, 2], 274 dilation_rate=[1, 1], 275 data_format="NCHW") 276 self._test( 277 input_shape=[2, 2, 7, 5, 3], 278 window_shape=[2, 2, 2], 279 padding=padding, 280 pooling_type="MAX", 281 strides=[1, 2, 1], 282 dilation_rate=[1, 1, 1], 283 data_format="NCDHW") 284 self._test( 285 input_shape=[2, 2, 7, 9], 286 window_shape=[2, 2], 287 padding="VALID", 288 pooling_type="MAX", 289 strides=[1, 1], 290 dilation_rate=[2, 2], 291 data_format="NCHW") 292 293 def _test_gradient(self, input_shape, **kwargs): 294 x_val = -np.arange( 295 np.prod(input_shape), dtype=np.float32).reshape(input_shape) - 1 296 x = constant_op.constant(x_val, name="x", dtype=dtypes.float32) 297 output = nn_ops.pool(input=x, **kwargs) 298 y_shape = output.get_shape().as_list() 299 err = gradient_checker.compute_gradient_error( 300 [x], [input_shape], output, y_shape, x_init_value=[x_val]) 301 err_tolerance = 1e-2 302 self.assertLess(err, err_tolerance) 303 304 def testGradient1D(self): 305 with self.test_session(use_gpu=test.is_gpu_available()): 306 for padding in ["SAME", "VALID"]: 307 for pooling_type in ["AVG", "MAX"]: 308 for input_shape in [[2, 5, 2], [1, 4, 1]]: 309 for window_shape in [[1], [2]]: 310 if padding != "SAME": 311 for dilation_rate in [[1], [2]]: 312 self._test_gradient( 313 input_shape=input_shape, 314 window_shape=window_shape, 315 padding=padding, 316 pooling_type=pooling_type, 317 dilation_rate=dilation_rate, 318 strides=[1]) 319 for strides in [[1], [2]]: 320 if np.any(np.array(strides) > window_shape): 321 continue 322 self._test( 323 input_shape=input_shape, 324 window_shape=window_shape, 325 padding=padding, 326 pooling_type=pooling_type, 327 dilation_rate=[1], 328 strides=strides) 329 330 def testGradient2D(self): 331 with self.test_session(use_gpu=test.is_gpu_available()): 332 for padding in ["SAME", "VALID"]: 333 for pooling_type in ["AVG", "MAX"]: 334 for input_shape in [[2, 4, 5, 2], [1, 5, 4, 1]]: 335 for window_shape in [[1, 1], [2, 1], [2, 2]]: 336 if padding != "SAME": 337 for dilation_rate in [[1, 1], [2, 1], [2, 2]]: 338 self._test_gradient( 339 input_shape=input_shape, 340 window_shape=window_shape, 341 padding=padding, 342 pooling_type=pooling_type, 343 dilation_rate=dilation_rate, 344 strides=[1, 1]) 345 for strides in [[1, 1], [2, 1], [1, 2], [2, 2]]: 346 if np.any(np.array(strides) > window_shape): 347 continue 348 self._test( 349 input_shape=input_shape, 350 window_shape=window_shape, 351 padding=padding, 352 pooling_type=pooling_type, 353 dilation_rate=[1, 1], 354 strides=strides) 355 356 def testGradient3D(self): 357 with self.test_session(use_gpu=test.is_gpu_available()): 358 for padding in ["SAME", "VALID"]: 359 for pooling_type in ["AVG", "MAX"]: 360 for input_shape in [[1, 3, 5, 4, 1], [1, 5, 4, 3, 1]]: 361 for window_shape in [[1, 1, 1], [2, 1, 2], [2, 2, 2]]: 362 if padding != "SAME": 363 for dilation_rate in [[1, 1, 1], [2, 1, 2], [2, 2, 2]]: 364 self._test_gradient( 365 input_shape=input_shape, 366 window_shape=window_shape, 367 padding=padding, 368 pooling_type=pooling_type, 369 dilation_rate=dilation_rate, 370 strides=[1, 1, 1]) 371 for strides in [[1, 1, 1], [2, 1, 2], [2, 2, 2]]: 372 if np.any(np.array(strides) > window_shape): 373 continue 374 self._test( 375 input_shape=input_shape, 376 window_shape=window_shape, 377 padding=padding, 378 pooling_type=pooling_type, 379 dilation_rate=[1, 1, 1], 380 strides=strides) 381 382 383if __name__ == "__main__": 384 test.main() 385