1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests for convolution related functionality in tensorflow.ops.nn.""" 16import numpy as np 17 18from tensorflow.python.framework import constant_op 19from tensorflow.python.framework import dtypes 20from tensorflow.python.ops import array_ops 21from tensorflow.python.ops import nn_ops 22from tensorflow.python.platform import test 23 24 25class Conv1DTest(test.TestCase): 26 27 def testBasic(self): 28 """Test that argument passing to conv1d is handled properly.""" 29 # double datatype is currently not supported for convolution ops 30 # on the ROCm platform 31 optional_float64 = [] if test.is_built_with_rocm() else [dtypes.float64] 32 for dtype in [dtypes.float16, dtypes.float32] + optional_float64: 33 x = constant_op.constant([1, 2, 3, 4], dtype=dtype) 34 x = array_ops.expand_dims(x, 0) # Add batch dimension 35 x = array_ops.expand_dims(x, 2) # And depth dimension 36 filters = constant_op.constant([2, 1], dtype=dtype) 37 filters = array_ops.expand_dims(filters, 1) # in_channels 38 filters = array_ops.expand_dims(filters, 2) # out_channels 39 # Filters is 2x1x1 40 for stride in [1, 2]: 41 with self.cached_session(use_gpu=test.is_gpu_available()): 42 c = nn_ops.conv1d(x, filters, stride, padding="VALID") 43 reduced = array_ops.squeeze(c) 44 output = self.evaluate(reduced) 45 if stride == 1: 46 self.assertEqual(len(output), 3) 47 self.assertAllClose(output, 48 [2 * 1 + 1 * 2, 2 * 2 + 1 * 3, 2 * 3 + 1 * 4]) 49 else: 50 self.assertEqual(len(output), 2) 51 self.assertAllClose(output, [2 * 1 + 1 * 2, 2 * 3 + 1 * 4]) 52 53 def testExpandedBatch(self): 54 """Test that argument passing to conv1d is handled properly.""" 55 # double datatype is currently not supported for convolution ops 56 # on the ROCm platform 57 x = constant_op.constant([1, 2, 3, 4], dtype=dtypes.float32) 58 x = array_ops.expand_dims(x, 0) # Add batch dimension 59 x = array_ops.expand_dims(x, 2) # And depth dimension 60 x = array_ops.stack([x, x]) # Make batch shape [2, 1] 61 filters = constant_op.constant([2, 1], dtype=dtypes.float32) 62 filters = array_ops.expand_dims(filters, 1) # in_channels 63 filters = array_ops.expand_dims(filters, 2) # out_channels 64 # Filters is 2x1x1 65 for stride in [1, 2]: 66 with self.cached_session(use_gpu=test.is_gpu_available()): 67 c = nn_ops.conv1d(x, filters, stride, padding="VALID") 68 reduced = array_ops.squeeze(c) # Sequeeze out dims 1 and 3. 69 output = self.evaluate(reduced) 70 if stride == 1: 71 self.assertAllClose(output, 72 [[2 * 1 + 1 * 2, 2 * 2 + 1 * 3, 2 * 3 + 1 * 4], 73 [2 * 1 + 1 * 2, 2 * 2 + 1 * 3, 2 * 3 + 1 * 4]]) 74 else: 75 self.assertAllClose( 76 output, 77 [[2 * 1 + 1 * 2, 2 * 3 + 1 * 4], [2 * 1 + 1 * 2, 2 * 3 + 1 * 4]]) 78 79 def testConv1DTranspose(self): 80 with self.cached_session(): 81 stride = 2 82 83 # Input, output: [batch, width, depth] 84 x_shape = [2, 4, 3] 85 y_shape = [2, 9, 2] 86 87 # Filter: [kernel_width, output_depth, input_depth] 88 f_shape = [3, 2, 3] 89 90 x = constant_op.constant( 91 1.0, shape=x_shape, name="x", dtype=dtypes.float32) 92 f = constant_op.constant( 93 1.0, shape=f_shape, name="filter", dtype=dtypes.float32) 94 output = nn_ops.conv1d_transpose( 95 x, f, y_shape, strides=stride, padding="VALID") 96 value = self.evaluate(output) 97 98 cache_values = np.zeros(y_shape, dtype=np.float32) 99 100 # The amount of padding added 101 pad = 1 102 103 for n in range(x_shape[0]): 104 for k in range(f_shape[1]): 105 for w in range(pad, y_shape[1] - pad): 106 target = 3.0 107 # We add a case for locations divisible by the stride. 108 w_in = w % stride == 0 and w > pad and w < y_shape[1] - 1 - pad 109 if w_in: 110 target += 3.0 111 cache_values[n, w, k] = target 112 113 # copy values in the border 114 cache_values[n, 0, k] = cache_values[n, 1, k] 115 cache_values[n, -1, k] = cache_values[n, -2, k] 116 117 self.assertAllClose(cache_values, value) 118 119 120if __name__ == "__main__": 121 test.main() 122