1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Functional tests for SpacetoDepth op.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import numpy as np 22 23from tensorflow.python.framework import constant_op 24from tensorflow.python.framework import dtypes 25from tensorflow.python.framework import errors 26from tensorflow.python.framework import ops 27from tensorflow.python.framework import test_util 28from tensorflow.python.ops import array_ops 29from tensorflow.python.ops import gen_array_ops 30from tensorflow.python.ops import gradient_checker_v2 31from tensorflow.python.ops import math_ops 32from tensorflow.python.platform import test 33from tensorflow.python.platform import tf_logging 34 35 36class SpaceToDepthTest(test.TestCase): 37 38 def _testOne(self, inputs, block_size, outputs, dtype=dtypes.float32): 39 input_nhwc = math_ops.cast(inputs, dtype) 40 with test_util.force_cpu(): 41 # test NHWC (default) on CPU 42 x_tf = array_ops.space_to_depth(input_nhwc, block_size) 43 self.assertAllEqual(self.evaluate(x_tf), outputs) 44 45 if test_util.is_gpu_available(): 46 with test_util.force_gpu(): 47 # test NHWC (default) on GPU 48 x_tf = array_ops.space_to_depth(input_nhwc, block_size) 49 self.assertAllEqual(self.evaluate(x_tf), outputs) 50 # test NCHW on GPU 51 input_nchw = test_util.NHWCToNCHW(input_nhwc) 52 output_nchw = array_ops.space_to_depth( 53 input_nchw, block_size, data_format="NCHW") 54 output_nhwc = test_util.NCHWToNHWC(output_nchw) 55 self.assertAllEqual(self.evaluate(output_nhwc), outputs) 56 57 def testBasic(self): 58 x_np = [[[[1], [2]], [[3], [4]]]] 59 block_size = 2 60 x_out = [[[[1, 2, 3, 4]]]] 61 for dtype in [dtypes.float32, dtypes.float16, dtypes.uint8]: 62 self._testOne(x_np, block_size, x_out, dtype=dtype) 63 64 65 # Tests for larger input dimensions. To make sure elements are 66 # correctly ordered spatially. 67 def testLargerInput2x2(self): 68 x_np = [[[[1], [2], [5], [6]], [[3], [4], [7], [8]], 69 [[9], [10], [13], [14]], [[11], [12], [15], [16]]]] 70 block_size = 2 71 x_out = [[[[1, 2, 3, 4], [5, 6, 7, 8]], [[9, 10, 11, 12], 72 [13, 14, 15, 16]]]] 73 self._testOne(x_np, block_size, x_out) 74 75 # Tests for larger input dimensions. To make sure elements are 76 # correctly ordered in depth. Here, larger block size. 77 def testLargerInput4x4(self): 78 x_np = [[[[1], [2], [5], [6]], [[3], [4], [7], [8]], 79 [[9], [10], [13], [14]], [[11], [12], [15], [16]]]] 80 block_size = 4 81 x_out = [[[[1, 2, 5, 6, 3, 4, 7, 8, 9, 10, 13, 14, 11, 12, 15, 16]]]] 82 self._testOne(x_np, block_size, x_out) 83 84 # Tests for larger input depths. 85 # To make sure elements are properly interleaved in depth. 86 def testDepthInterleaved(self): 87 x_np = [[[[1, 10], [2, 20]], [[3, 30], [4, 40]]]] 88 block_size = 2 89 x_out = [[[[1, 10, 2, 20, 3, 30, 4, 40]]]] 90 self._testOne(x_np, block_size, x_out) 91 92 # Tests for larger input depths. Here an odd depth. 93 # To make sure elements are properly interleaved in depth. 94 def testDepthInterleavedDepth3(self): 95 x_np = [[[[1, 2, 3], [4, 5, 6]], [[7, 8, 9], [10, 11, 12]]]] 96 block_size = 2 97 x_out = [[[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]]]] 98 self._testOne(x_np, block_size, x_out) 99 100 # Tests for larger input dimensions AND for larger input depths. 101 # To make sure elements are properly interleaved in depth and ordered 102 # spatially. 103 def testDepthInterleavedLarge(self): 104 x_np = [[[[1, 10], [2, 20], [5, 50], [6, 60]], 105 [[3, 30], [4, 40], [7, 70], [8, 80]], 106 [[9, 90], [10, 100], [13, 130], [14, 140]], 107 [[11, 110], [12, 120], [15, 150], [16, 160]]]] 108 block_size = 2 109 x_out = [[[[1, 10, 2, 20, 3, 30, 4, 40], [5, 50, 6, 60, 7, 70, 8, 80]], 110 [[9, 90, 10, 100, 11, 110, 12, 120], 111 [13, 130, 14, 140, 15, 150, 16, 160]]]] 112 self._testOne(x_np, block_size, x_out) 113 114 def testBlockSize2Batch10(self): 115 block_size = 2 116 117 def batch_input_elt(i): 118 return [[[1 * i], [2 * i], [5 * i], [6 * i]], 119 [[3 * i], [4 * i], [7 * i], [8 * i]], 120 [[9 * i], [10 * i], [13 * i], [14 * i]], 121 [[11 * i], [12 * i], [15 * i], [16 * i]]] 122 123 def batch_output_elt(i): 124 return [[[1 * i, 2 * i, 3 * i, 4 * i], [5 * i, 6 * i, 7 * i, 8 * i]], 125 [[9 * i, 10 * i, 11 * i, 12 * i], 126 [13 * i, 14 * i, 15 * i, 16 * i]]] 127 128 batch_size = 10 129 x_np = [batch_input_elt(i) for i in range(batch_size)] 130 x_out = [batch_output_elt(i) for i in range(batch_size)] 131 self._testOne(x_np, block_size, x_out) 132 133 def testBatchSize0(self): 134 block_size = 2 135 batch_size = 0 136 input_nhwc = array_ops.ones([batch_size, 4, 6, 3]) 137 x_out = array_ops.ones([batch_size, 2, 3, 12]) 138 139 with test_util.force_cpu(): 140 # test NHWC (default) on CPU 141 x_tf = array_ops.space_to_depth(input_nhwc, block_size) 142 self.assertAllEqual(x_tf.shape, x_out.shape) 143 self.evaluate(x_tf) 144 145 if test.is_gpu_available(): 146 with test_util.use_gpu(): 147 # test NHWC (default) on GPU 148 x_tf = array_ops.space_to_depth(input_nhwc, block_size) 149 self.assertAllEqual(x_tf.shape, x_out.shape) 150 self.evaluate(x_tf) 151 152 # Tests for different width and height. 153 def testNonSquare(self): 154 x_np = [[[[1, 10], [2, 20]], [[3, 30], [4, 40]], [[5, 50], [6, 60]], 155 [[7, 70], [8, 80]], [[9, 90], [10, 100]], [[11, 110], [12, 120]]]] 156 block_size = 2 157 x_out = [[[[1, 10, 2, 20, 3, 30, 4, 40]], [[5, 50, 6, 60, 7, 70, 8, 80]], 158 [[9, 90, 10, 100, 11, 110, 12, 120]]]] 159 self._testOne(x_np, block_size, x_out) 160 161 # Error handling: 162 163 def testInputWrongDimMissingDepth(self): 164 # The input is missing the last dimension ("depth") 165 x_np = [[[1, 2], [3, 4]]] 166 block_size = 2 167 with self.assertRaises((ValueError, errors.InvalidArgumentError)): 168 out_tf = array_ops.space_to_depth(x_np, block_size) 169 self.evaluate(out_tf) 170 171 def testInputWrongDimMissingBatch(self): 172 # The input is missing the first dimension ("batch") 173 x_np = [[[1], [2]], [[3], [4]]] 174 block_size = 2 175 with self.assertRaises((ValueError, errors.InvalidArgumentError)): 176 _ = array_ops.space_to_depth(x_np, block_size) 177 178 def testBlockSize0(self): 179 # The block size is 0. 180 x_np = [[[[1], [2]], [[3], [4]]]] 181 block_size = 0 182 with self.assertRaises((ValueError, errors.InvalidArgumentError)): 183 out_tf = array_ops.space_to_depth(x_np, block_size) 184 self.evaluate(out_tf) 185 186 def testBlockSizeOne(self): 187 # The block size is 1. The block size needs to be > 1. 188 x_np = [[[[1], [2]], [[3], [4]]]] 189 block_size = 1 190 with self.assertRaises((ValueError, errors.InvalidArgumentError)): 191 out_tf = array_ops.space_to_depth(x_np, block_size) 192 self.evaluate(out_tf) 193 194 def testBlockSizeLarger(self): 195 # The block size is too large for this input. 196 x_np = [[[[1], [2]], [[3], [4]]]] 197 block_size = 10 198 with self.assertRaises((ValueError, errors.InvalidArgumentError)): 199 out_tf = array_ops.space_to_depth(x_np, block_size) 200 self.evaluate(out_tf) 201 202 def testBlockSizeNotDivisibleWidth(self): 203 # The block size divides width but not height. 204 x_np = [[[[1], [2], [3]], [[3], [4], [7]]]] 205 block_size = 3 206 with self.assertRaises((ValueError, errors.InvalidArgumentError)): 207 _ = array_ops.space_to_depth(x_np, block_size) 208 209 def testBlockSizeNotDivisibleHeight(self): 210 # The block size divides height but not width. 211 x_np = [[[[1], [2]], [[3], [4]], [[5], [6]]]] 212 block_size = 3 213 with self.assertRaises((ValueError, errors.InvalidArgumentError)): 214 _ = array_ops.space_to_depth(x_np, block_size) 215 216 def testBlockSizeNotDivisibleBoth(self): 217 # The block size does not divide neither width or height. 218 x_np = [[[[1], [2]], [[3], [4]]]] 219 block_size = 3 220 with self.assertRaises((ValueError, errors.InvalidArgumentError)): 221 _ = array_ops.space_to_depth(x_np, block_size) 222 223 def testUnknownShape(self): 224 # Testing an unkown shape in graph. 225 with ops.Graph().as_default(): 226 t = array_ops.space_to_depth( 227 array_ops.placeholder(dtypes.float32), block_size=4) 228 self.assertEqual(4, t.get_shape().ndims) 229 230 def spaceToDepthUsingTranspose(self, tensor, block_size, data_format): 231 block_size_sq = block_size * block_size 232 233 dtype = tensor.dtype 234 if dtype == dtypes.qint8: 235 tensor = array_ops.bitcast(tensor, dtypes.int8) 236 237 if data_format == "NHWC": 238 b, ih, iw, ic = tensor.shape.as_list() 239 assert ih % block_size == 0, (ih, block_size) 240 assert iw % block_size == 0, (iw, block_size) 241 ow, oh, oc = iw // block_size, ih // block_size, ic * block_size_sq 242 tensor = array_ops.reshape(tensor, 243 [b, oh, block_size, ow, block_size, ic]) 244 tensor = array_ops.transpose(tensor, [0, 1, 3, 2, 4, 5]) 245 tensor = array_ops.reshape(tensor, [b, oh, ow, oc]) 246 elif data_format == "NCHW": 247 b, ic, ih, iw = tensor.shape.as_list() 248 assert ih % block_size == 0, (ih, block_size) 249 assert iw % block_size == 0, (iw, block_size) 250 ow, oh, oc = iw // block_size, ih // block_size, ic * block_size_sq 251 tensor = array_ops.reshape(tensor, 252 [b, ic, oh, block_size, ow, block_size]) 253 tensor = array_ops.transpose(tensor, [0, 3, 5, 1, 2, 4]) 254 tensor = array_ops.reshape(tensor, [b, oc, oh, ow]) 255 256 if dtype == dtypes.qint8: 257 tensor = array_ops.bitcast(tensor, dtype) 258 return tensor 259 260 def compareToTranspose(self, batch_size, out_height, out_width, in_channels, 261 block_size, data_format, data_type, use_gpu): 262 in_height = out_height * block_size 263 in_width = out_width * block_size 264 nhwc_input_shape = [batch_size, in_height, in_width, in_channels] 265 nchw_input_shape = [batch_size, in_channels, in_height, in_width] 266 total_size = np.prod(nhwc_input_shape) 267 268 # Construct the input tensor in data_type and NHWC. 269 # force_cpu is needed because quantize_v2 runs on only CPU. 270 with test_util.force_cpu(): 271 if data_type == dtypes.qint8: 272 # Initialize the input tensor with qint8 values that circle -127..127. 273 x = [((f + 128) % 255) - 127 for f in range(total_size)] 274 t = constant_op.constant( 275 x, shape=nhwc_input_shape, dtype=dtypes.float32) 276 t, _, _ = gen_array_ops.quantize_v2(t, -128.0, 127.0, dtypes.qint8) 277 else: 278 assert data_type == dtypes.float32 279 # Initialize the input tensor with ascending whole numbers as floats. 280 x = [f * 1.0 for f in range(total_size)] 281 shape = nchw_input_shape if data_format == "NCHW" else nhwc_input_shape 282 t = constant_op.constant(x, shape=shape, dtype=dtypes.float32) 283 284 with test_util.device(use_gpu): 285 if data_format == "NCHW_VECT_C": 286 assert data_type == dtypes.qint8 287 288 # Convert to int8, then NHWCToNCHW_VECT_C, and then back to qint8. 289 actual = array_ops.bitcast(t, dtypes.int8) 290 actual = test_util.NHWCToNCHW_VECT_C(actual) 291 actual = array_ops.bitcast(actual, dtypes.qint8) 292 actual = array_ops.space_to_depth( 293 actual, block_size, data_format=data_format) 294 actual = array_ops.bitcast(actual, dtypes.int8) 295 actual = test_util.NCHW_VECT_CToNHWC(actual) 296 actual = array_ops.bitcast(actual, dtypes.qint8) 297 298 expected = array_ops.bitcast(t, dtypes.int8) 299 expected = math_ops.cast(expected, dtypes.float32) 300 expected = self.spaceToDepthUsingTranspose(expected, block_size, "NHWC") 301 expected = math_ops.cast(expected, dtypes.int8) 302 expected = array_ops.bitcast(expected, dtypes.qint8) 303 else: 304 # Initialize the input tensor with ascending whole numbers as floats. 305 actual = array_ops.space_to_depth( 306 t, block_size, data_format=data_format) 307 expected = self.spaceToDepthUsingTranspose(t, block_size, data_format) 308 309 actual_vals, expected_vals = self.evaluate([actual, expected]) 310 self.assertTrue(np.array_equal(actual_vals, expected_vals)) 311 312 @test_util.disable_tfrt("b/169901260") 313 def testAgainstTranspose(self): 314 self.compareToTranspose(3, 2, 3, 1, 2, "NHWC", dtypes.float32, False) 315 self.compareToTranspose(1, 2, 3, 2, 2, "NHWC", dtypes.float32, False) 316 self.compareToTranspose(1, 2, 3, 2, 3, "NHWC", dtypes.float32, False) 317 318 self.compareToTranspose(3, 2, 3, 1, 2, "NHWC", dtypes.qint8, False) 319 self.compareToTranspose(1, 2, 3, 2, 2, "NHWC", dtypes.qint8, False) 320 self.compareToTranspose(1, 2, 3, 2, 3, "NHWC", dtypes.qint8, False) 321 322 if not test.is_gpu_available(): 323 tf_logging.info("skipping gpu tests since gpu not available") 324 return 325 326 self.compareToTranspose(3, 2, 3, 1, 2, "NHWC", dtypes.float32, True) 327 self.compareToTranspose(3, 2, 3, 2, 2, "NHWC", dtypes.float32, True) 328 self.compareToTranspose(3, 2, 3, 1, 2, "NCHW", dtypes.float32, True) 329 self.compareToTranspose(3, 2, 3, 2, 3, "NCHW", dtypes.float32, True) 330 self.compareToTranspose(5, 7, 11, 3, 2, "NCHW", dtypes.float32, True) 331 332 self.compareToTranspose(3, 2, 3, 4, 2, "NCHW_VECT_C", dtypes.qint8, True) 333 self.compareToTranspose(3, 2, 3, 8, 3, "NCHW_VECT_C", dtypes.qint8, True) 334 self.compareToTranspose(5, 7, 11, 12, 2, "NCHW_VECT_C", dtypes.qint8, True) 335 336 337class SpaceToDepthGradientTest(test.TestCase): 338 339 # Check the gradients. 340 def _checkGrad(self, x, block_size, data_format): 341 # NCHW is implemented for only GPU. 342 if data_format == "NCHW" and not test.is_gpu_available(): 343 return 344 345 assert 4 == x.ndim 346 347 def func(x): 348 return array_ops.space_to_depth(x, block_size, data_format=data_format) 349 350 with test_util.use_gpu(): 351 with self.cached_session(): 352 theoretical, numerical = gradient_checker_v2.compute_gradient( 353 func, [ops.convert_to_tensor(x)]) 354 self.assertAllClose(theoretical, numerical, rtol=1e-2, atol=1e-2) 355 356 # Tests a gradient for space_to_depth of x which is a four dimensional 357 # tensor of shape [b, h * block_size, w * block_size, d]. 358 def _compare(self, b, h, w, d, block_size, data_format): 359 block_size_sq = block_size * block_size 360 data = np.random.normal(0, 1, b * h * w * d * block_size_sq).astype( 361 np.float32) 362 if data_format == "NHWC": 363 x = data.reshape([b, h * block_size, w * block_size, d]) 364 else: 365 x = data.reshape([b, d, h * block_size, w * block_size]) 366 367 self._checkGrad(x, block_size, data_format) 368 369 # Don't use very large numbers as dimensions here as the result is tensor 370 # with cartesian product of the dimensions. 371 def testSmall(self): 372 block_size = 2 373 self._compare(1, 2, 3, 5, block_size, "NHWC") 374 self._compare(1, 2, 3, 5, block_size, "NCHW") 375 376 @test_util.run_deprecated_v1 377 def testSmall2(self): 378 block_size = 2 379 self._compare(2, 4, 3, 2, block_size, "NHWC") 380 self._compare(2, 4, 3, 2, block_size, "NCHW") 381 382 383if __name__ == "__main__": 384 test.main() 385