1# Copyright 2020 Huawei Technologies Co., Ltd 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================ 15import numpy as np 16import pytest 17 18import mindspore 19import mindspore.context as context 20import mindspore.nn as nn 21import mindspore.ops as ops 22from mindspore import Tensor 23 24context.set_context(mode=context.GRAPH_MODE, device_target='CPU') 25 26 27class TestTimeDistributed(nn.Cell): 28 def __init__(self, cell, time_axis, reshape_with_axis=None): 29 super(TestTimeDistributed, self).__init__() 30 self.time_distributed = nn.TimeDistributed(cell, time_axis, reshape_with_axis) 31 32 def construct(self, inputs): 33 return self.time_distributed(inputs) 34 35 36@pytest.mark.level0 37@pytest.mark.platform_x86_cpu 38@pytest.mark.env_onecard 39def test_time_distributed_conv2d(): 40 inputs = np.random.randint(0, 10, [32, 12, 10, 10]) 41 conv2d = nn.Conv2d(12, 24, 4, has_bias=False, weight_init='normal') 42 output_expect = conv2d(Tensor(inputs, mindspore.float32)).asnumpy() 43 inputs = inputs.reshape([32, 1, 12, 10, 10]).repeat(6, axis=1) 44 time_distributed = TestTimeDistributed(conv2d, time_axis=1, reshape_with_axis=0) 45 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 46 for i in range(output.shape[1]): 47 assert np.all(np.abs(output[:, i, :] - output_expect) < 1e-5) 48 print("Conv2D layer wrapped successful") 49 50 51@pytest.mark.level0 52@pytest.mark.platform_x86_cpu 53@pytest.mark.env_onecard 54def test_time_distributed_maxpool2d(): 55 inputs = np.random.randint(0, 10, [32, 12, 10, 10]) 56 pool = nn.MaxPool2d(kernel_size=3, stride=1) 57 output_expect = pool(Tensor(inputs, mindspore.float32)).asnumpy() 58 inputs = inputs.reshape([32, 1, 12, 10, 10]).repeat(6, axis=1) 59 time_distributed = TestTimeDistributed(pool, time_axis=1, reshape_with_axis=0) 60 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 61 for i in range(output.shape[1]): 62 assert np.all(output[:, i, :] == output_expect) 63 print("MaxPooling2D layer wrapped successful") 64 65 66@pytest.mark.level0 67@pytest.mark.platform_x86_cpu 68@pytest.mark.env_onecard 69def test_time_distributed_dense(): 70 inputs = np.random.randint(0, 10, [32, 10]) 71 dense = nn.Dense(10, 6) 72 output_expect = dense(Tensor(inputs, mindspore.float32)).asnumpy() 73 inputs = inputs.reshape([32, 1, 10]).repeat(6, axis=1) 74 time_distributed = TestTimeDistributed(dense, time_axis=1, reshape_with_axis=0) 75 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 76 for i in range(output.shape[1]): 77 assert np.all(output[:, i, :] == output_expect) 78 print("Dense layer wrapped successful") 79 80 81@pytest.mark.level0 82@pytest.mark.platform_x86_cpu 83@pytest.mark.env_onecard 84def test_time_distributed_dense_pynative(): 85 context.set_context(mode=context.PYNATIVE_MODE, device_target='CPU') 86 inputs = np.random.randint(0, 10, [32, 10]) 87 dense = nn.Dense(10, 6) 88 output_expect = dense(Tensor(inputs, mindspore.float32)).asnumpy() 89 inputs = inputs.reshape([32, 1, 10]).repeat(6, axis=1) 90 time_distributed = TestTimeDistributed(dense, time_axis=1, reshape_with_axis=0) 91 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 92 for i in range(output.shape[1]): 93 assert np.all(output[:, i, :] == output_expect) 94 print("Dense layer with pynative mode wrapped successful") 95 96 97@pytest.mark.level0 98@pytest.mark.platform_x86_cpu 99@pytest.mark.env_onecard 100def test_time_distributed_dense_with_reshape_axis_not_first(): 101 inputs = np.random.randint(0, 10, [32, 10]) 102 dense = nn.Dense(10, 6) 103 output_expect = dense(Tensor(inputs, mindspore.float32)).asnumpy() 104 inputs = inputs.reshape([1, 32, 10]).repeat(6, axis=0) 105 time_distributed = TestTimeDistributed(dense, time_axis=0, reshape_with_axis=1) 106 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 107 for i in range(output.shape[0]): 108 assert np.all(output[i, :] == output_expect) 109 print("Dense layer wrapped successful") 110 111 112@pytest.mark.level0 113@pytest.mark.platform_x86_cpu 114@pytest.mark.env_onecard 115def test_time_distributed_argmax(): 116 inputs = np.random.randint(0, 10, [3, 4]) 117 argmax = ops.Argmax(output_type=mindspore.int32, axis=1) 118 output_expect = argmax(Tensor(inputs, mindspore.float32)).asnumpy() 119 inputs = inputs.reshape([3, 1, 4]).repeat(6, axis=1) 120 time_distributed = TestTimeDistributed(argmax, time_axis=1, reshape_with_axis=0) 121 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 122 for i in range(output.shape[1]): 123 assert np.all(output[:, i] == output_expect) 124 print("Argmax op wrapped successful") 125 126 127@pytest.mark.level0 128@pytest.mark.platform_x86_cpu 129@pytest.mark.env_onecard 130def test_time_distributed_flatten(): 131 inputs = np.random.randint(0, 10, [3, 4, 5]) 132 flatten = nn.Flatten() 133 output_expect = flatten(Tensor(inputs, mindspore.float32)).asnumpy() 134 inputs = inputs.reshape([3, 1, 4, 5]).repeat(6, axis=1) 135 time_distributed = TestTimeDistributed(flatten, time_axis=1, reshape_with_axis=0) 136 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 137 for i in range(output.shape[1]): 138 assert np.all(output[:, i, :] == output_expect) 139 print("Flatten op wrapped successful") 140 141 142@pytest.mark.level0 143@pytest.mark.platform_x86_cpu 144@pytest.mark.env_onecard 145def test_time_distributed_conv2d_no_reshape_axis(): 146 inputs = np.random.randint(0, 10, [32, 12, 10, 10]) 147 conv2d = nn.Conv2d(12, 24, 4, has_bias=False, weight_init='normal') 148 output_expect = conv2d(Tensor(inputs, mindspore.float32)).asnumpy() 149 inputs = inputs.reshape([32, 1, 12, 10, 10]).repeat(6, axis=1) 150 time_distributed = TestTimeDistributed(conv2d, time_axis=1) 151 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 152 for i in range(output.shape[1]): 153 assert np.all(output[:, i, :] == output_expect) 154 print("Conv2D layer with no reshape axis wrapped successful") 155 156 157@pytest.mark.level0 158@pytest.mark.platform_x86_cpu 159@pytest.mark.env_onecard 160def test_time_distributed_maxpool2d_no_reshape_axis(): 161 inputs = np.random.randint(0, 10, [32, 12, 10, 10]) 162 pool = nn.MaxPool2d(kernel_size=3, stride=1) 163 output_expect = pool(Tensor(inputs, mindspore.float32)).asnumpy() 164 inputs = inputs.reshape([32, 1, 12, 10, 10]).repeat(6, axis=1) 165 time_distributed = TestTimeDistributed(pool, time_axis=1) 166 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 167 for i in range(output.shape[1]): 168 assert np.all(output[:, i, :] == output_expect) 169 print("MaxPooling2D layer with no reshape axis wrapped successful") 170 171 172@pytest.mark.level0 173@pytest.mark.platform_x86_cpu 174@pytest.mark.env_onecard 175def test_time_distributed_dense_no_reshape_axis(): 176 inputs = np.random.randint(0, 10, [32, 10]) 177 dense = nn.Dense(10, 6) 178 output_expect = dense(Tensor(inputs, mindspore.float32)).asnumpy() 179 inputs = inputs.reshape([32, 1, 10]).repeat(6, axis=1) 180 time_distributed = TestTimeDistributed(dense, time_axis=1) 181 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 182 for i in range(output.shape[1]): 183 assert np.all(output[:, i, :] == output_expect) 184 print("Dense layer with no reshape axis wrapped successful") 185 186 187@pytest.mark.level0 188@pytest.mark.platform_x86_cpu 189@pytest.mark.env_onecard 190def test_time_distributed_argmax_no_reshape_axis(): 191 inputs = np.random.randint(0, 10, [3, 4]) 192 argmax = ops.Argmax(output_type=mindspore.int32, axis=1) 193 output_expect = argmax(Tensor(inputs, mindspore.float32)).asnumpy() 194 inputs = inputs.reshape([3, 1, 4]).repeat(6, axis=1) 195 time_distributed = TestTimeDistributed(argmax, time_axis=1) 196 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 197 for i in range(output.shape[1]): 198 assert np.all(output[:, i] == output_expect) 199 print("Argmax op with no reshape axis wrapped successful") 200 201 202@pytest.mark.level0 203@pytest.mark.platform_x86_cpu 204@pytest.mark.env_onecard 205def test_time_distributed_flatten_no_reshape_axis(): 206 inputs = np.random.randint(0, 10, [3, 4, 5]) 207 flatten = nn.Flatten() 208 output_expect = flatten(Tensor(inputs, mindspore.float32)).asnumpy() 209 inputs = inputs.reshape([3, 1, 4, 5]).repeat(6, axis=1) 210 time_distributed = TestTimeDistributed(flatten, time_axis=1) 211 output = time_distributed(Tensor(inputs, mindspore.float32)).asnumpy() 212 for i in range(output.shape[1]): 213 assert np.all(output[:, i, :] == output_expect) 214 print("Flatten op with no reshape axis wrapped successful") 215